From 41914452e3a7472b7e758f87210fdcfc2c88e38a Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Fri, 31 Mar 2023 02:09:24 +0000 Subject: [PATCH] Restore static single threaded executor Signed-off-by: Michael Carroll --- .../static_executor_entities_collector.hpp | 357 ++++++++++++ .../static_single_threaded_executor.hpp | 223 ++++++++ rclcpp/include/rclcpp/wait_set_template.hpp | 1 - .../executor_entities_collection.cpp | 1 - .../executors/executor_notify_waitable.cpp | 2 - .../static_executor_entities_collector.cpp | 524 ++++++++++++++++++ .../static_single_threaded_executor.cpp | 280 ++++++++++ 7 files changed, 1384 insertions(+), 4 deletions(-) create mode 100644 rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp create mode 100644 rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp create mode 100644 rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp create mode 100644 rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp new file mode 100644 index 0000000000..f9fd2ff672 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -0,0 +1,357 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +#include +#include +#include +#include +#include + +#include "rcl/guard_condition.h" +#include "rcl/wait.h" + +#include "rclcpp/experimental/executable_list.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ +typedef std::map> WeakCallbackGroupsToNodesMap; + +class StaticExecutorEntitiesCollector final + : public rclcpp::Waitable, + public std::enable_shared_from_this +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector) + + // Constructor + RCLCPP_PUBLIC + StaticExecutorEntitiesCollector() = default; + + // Destructor + RCLCPP_PUBLIC + ~StaticExecutorEntitiesCollector(); + + /// Initialize StaticExecutorEntitiesCollector + /** + * \param p_wait_set A reference to the wait set to be used in the executor + * \param memory_strategy Shared pointer to the memory strategy to set. + * \throws std::runtime_error if memory strategy is null + */ + RCLCPP_PUBLIC + void + init( + rcl_wait_set_t * p_wait_set, + rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy); + + /// Finalize StaticExecutorEntitiesCollector to clear resources + RCLCPP_PUBLIC + bool + is_init() {return initialized_;} + + RCLCPP_PUBLIC + void + fini(); + + /// Execute the waitable. + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override; + + /// Take the data so that it can be consumed with `execute`. + /** + * For `StaticExecutorEntitiesCollector`, this always return `nullptr`. + * \sa rclcpp::Waitable::take_data() + */ + RCLCPP_PUBLIC + std::shared_ptr + take_data() override; + + /// Function to add_handles_to_wait_set and wait for work and + /** + * block until the wait set is ready or until the timeout has been exceeded. + * \throws std::runtime_error if wait set couldn't be cleared or filled. + * \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error() + */ + RCLCPP_PUBLIC + void + refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + /** + * \throws std::runtime_error if it couldn't add guard condition to wait set + */ + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() override; + + /// Add a callback group to an executor. + /** + * \see rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + bool + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to an executor. + /** + * \see rclcpp::Executor::add_callback_group + * \return boolean whether the node from the callback group is new + */ + RCLCPP_PUBLIC + bool + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /// Remove a callback group from the executor. + /** + * \see rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + bool + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Remove a callback group from the executor. + /** + * \see rclcpp::Executor::remove_callback_group_from_map + */ + RCLCPP_PUBLIC + bool + remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /** + * \see rclcpp::Executor::add_node() + * \throw std::runtime_error if node was already added + */ + RCLCPP_PUBLIC + bool + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /** + * \see rclcpp::Executor::remove_node() + * \throw std::runtime_error if no guard condition is associated with node. + */ + RCLCPP_PUBLIC + bool + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * \see rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes(); + + /// Complete all available queued work without blocking. + /** + * This function checks if after the guard condition was triggered + * (or a spurious wakeup happened) we are really ready to execute + * i.e. re-collect entities + */ + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) override; + + /// Return number of timers + /** + * \return number of timers + */ + RCLCPP_PUBLIC + size_t + get_number_of_timers() {return exec_list_.number_of_timers;} + + /// Return number of subscriptions + /** + * \return number of subscriptions + */ + RCLCPP_PUBLIC + size_t + get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;} + + /// Return number of services + /** + * \return number of services + */ + RCLCPP_PUBLIC + size_t + get_number_of_services() {return exec_list_.number_of_services;} + + /// Return number of clients + /** + * \return number of clients + */ + RCLCPP_PUBLIC + size_t + get_number_of_clients() {return exec_list_.number_of_clients;} + + /// Return number of waitables + /** + * \return number of waitables + */ + RCLCPP_PUBLIC + size_t + get_number_of_waitables() {return exec_list_.number_of_waitables;} + + /** Return a SubscritionBase Sharedptr by index. + * \param[in] i The index of the SubscritionBase + * \return a SubscritionBase shared pointer + * \throws std::out_of_range if the argument is higher than the size of the structrue. + */ + RCLCPP_PUBLIC + rclcpp::SubscriptionBase::SharedPtr + get_subscription(size_t i) {return exec_list_.subscription[i];} + + /** Return a TimerBase Sharedptr by index. + * \param[in] i The index of the TimerBase + * \return a TimerBase shared pointer + * \throws std::out_of_range if the argument is higher than the size. + */ + RCLCPP_PUBLIC + rclcpp::TimerBase::SharedPtr + get_timer(size_t i) {return exec_list_.timer[i];} + + /** Return a ServiceBase Sharedptr by index. + * \param[in] i The index of the ServiceBase + * \return a ServiceBase shared pointer + * \throws std::out_of_range if the argument is higher than the size. + */ + RCLCPP_PUBLIC + rclcpp::ServiceBase::SharedPtr + get_service(size_t i) {return exec_list_.service[i];} + + /** Return a ClientBase Sharedptr by index + * \param[in] i The index of the ClientBase + * \return a ClientBase shared pointer + * \throws std::out_of_range if the argument is higher than the size. + */ + RCLCPP_PUBLIC + rclcpp::ClientBase::SharedPtr + get_client(size_t i) {return exec_list_.client[i];} + + /** Return a Waitable Sharedptr by index + * \param[in] i The index of the Waitable + * \return a Waitable shared pointer + * \throws std::out_of_range if the argument is higher than the size. + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_waitable(size_t i) {return exec_list_.waitable[i];} + +private: + /// Function to reallocate space for entities in the wait set. + /** + * \throws std::runtime_error if wait set couldn't be cleared or resized. + */ + void + prepare_wait_set(); + + void + fill_executable_list(); + + void + fill_memory_strategy(); + + /// Return true if the node belongs to the collector + /** + * \param[in] node_ptr a node base interface shared pointer + * \param[in] weak_groups_to_nodes map to nodes to lookup + * \return boolean whether a node belongs the collector + */ + bool + has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; + + /// Add all callback groups that can be automatically added by any executor + /// and is not already associated with an executor from nodes + /// that are associated with executor + /** + * \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor() + */ + void + add_callback_groups_from_nodes_associated_to_executor(); + + void + fill_executable_list_from_map(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /// Memory strategy: an interface for handling user-defined memory allocation strategies. + rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + + typedef std::map> + WeakNodesToGuardConditionsMap; + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + + /// List of weak nodes registered in the static executor + std::list weak_nodes_; + + // Mutex to protect vector of new nodes. + std::mutex new_nodes_mutex_; + std::vector new_nodes_; + + /// Wait set for managing entities that the rmw layer waits on. + rcl_wait_set_t * p_wait_set_ = nullptr; + + /// Executable list: timers, subscribers, clients, services and waitables + rclcpp::experimental::ExecutableList exec_list_; + + /// Bool to check if the entities collector has been initialized + bool initialized_ = false; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp new file mode 100644 index 0000000000..5294605eaf --- /dev/null +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -0,0 +1,223 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rmw/rmw.h" + +#include "rclcpp/executor.hpp" +#include "rclcpp/executors/static_executor_entities_collector.hpp" +#include "rclcpp/experimental/executable_list.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/rate.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/// Static executor implementation +/** + * This executor is a static version of the original single threaded executor. + * It's static because it doesn't reconstruct the executable list for every iteration. + * All nodes, callbackgroups, timers, subscriptions etc. are created before + * spin() is called, and modified only when an entity is added/removed to/from a node. + * + * To run this executor instead of SingleThreadedExecutor replace: + * rclcpp::executors::SingleThreadedExecutor exec; + * by + * rclcpp::executors::StaticSingleThreadedExecutor exec; + * in your source code and spin node(s) in the following way: + * exec.add_node(node); + * exec.spin(); + * exec.remove_node(node); + */ +class StaticSingleThreadedExecutor : public rclcpp::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(StaticSingleThreadedExecutor) + + /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC + explicit StaticSingleThreadedExecutor( + const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); + + /// Default destrcutor. + RCLCPP_PUBLIC + virtual ~StaticSingleThreadedExecutor(); + + /// Static executor implementation of spin. + /** + * This function will block until work comes in, execute it, and keep blocking. + * It will only be interrupted by a CTRL-C (managed by the global signal handler). + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin() override; + + /// Static executor implementation of spin some + /** + * This non-blocking function will execute entities that + * were ready when this API was called, until timeout or no + * more work available. Entities that got ready while + * executing work, won't be taken into account here. + * + * Example: + * while(condition) { + * spin_some(); + * sleep(); // User should have some sync work or + * // sleep to avoid a 100% CPU usage + * } + */ + RCLCPP_PUBLIC + void + spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override; + + /// Static executor implementation of spin all + /** + * This non-blocking function will execute entities until timeout (must be >= 0) + * or no more work available. + * If timeout is `0`, potentially it blocks forever until no more work is available. + * If new entities get ready while executing work available, they will be executed + * as long as the timeout hasn't expired. + * + * Example: + * while(condition) { + * spin_all(); + * sleep(); // User should have some sync work or + * // sleep to avoid a 100% CPU usage + * } + */ + RCLCPP_PUBLIC + void + spin_all(std::chrono::nanoseconds max_duration) override; + + /// Add a callback group to an executor. + /** + * \sa rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Remove callback group from the executor + /** + * \sa rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify = true) override; + + /// Add a node to the executor. + /** + * \sa rclcpp::Executor::add_node + */ + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::StaticSingleThreadedExecutor::add_node + */ + RCLCPP_PUBLIC + void + add_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Remove a node from the executor. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node(std::shared_ptr node_ptr, bool notify = true) override; + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes() override; + +protected: + /** + * @brief Executes ready executables from wait set. + * @param spin_once if true executes only the first ready executable. + * @return true if any executable was ready. + */ + RCLCPP_PUBLIC + bool + execute_ready_executables(bool spin_once = false); + + RCLCPP_PUBLIC + void + spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive); + + RCLCPP_PUBLIC + void + spin_once_impl(std::chrono::nanoseconds timeout) override; + +private: + RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor) + + StaticExecutorEntitiesCollector::SharedPtr entities_collector_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index c139913be8..b3f41f8ed5 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -661,7 +661,6 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli this->storage_acquire_ownerships(); RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();}); - // this method comes from the SynchronizationPolicy return this->template sync_wait>( // pass along the time_to_wait duration as nanoseconds diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index a10e8d694f..2dce369b08 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -121,7 +121,6 @@ void check_ready( exec.callback_group = callback_group; if (fill_executable(exec, entity)) { executables.push_back(exec); - } else { } } } diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index 2779a9cd30..78e2db91a2 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -48,7 +48,6 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set) bool ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) { - std::cout << "ExecutorNotifyWaitable::is_ready" << std::endl; std::lock_guard lock(guard_condition_mutex_); for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) { auto rcl_guard_condition = wait_set->guard_conditions[ii]; @@ -69,7 +68,6 @@ ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) void ExecutorNotifyWaitable::execute(std::shared_ptr & data) { - std::cout << "ExecutorNotifyWaitable::execute" << std::endl; (void) data; this->execute_callback_(); } diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp new file mode 100644 index 0000000000..bf50e062f3 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -0,0 +1,524 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/static_executor_entities_collector.hpp" + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/executors/static_single_threaded_executor.hpp" +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" + +using rclcpp::executors::StaticExecutorEntitiesCollector; + +StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() +{ + // Disassociate all callback groups and thus nodes. + for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + // Disassociate all nodes + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + weak_groups_associated_with_executor_to_nodes_.clear(); + weak_groups_to_nodes_associated_with_executor_.clear(); + exec_list_.clear(); + weak_nodes_.clear(); + weak_nodes_to_guard_conditions_.clear(); +} + +void +StaticExecutorEntitiesCollector::init( + rcl_wait_set_t * p_wait_set, + rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy) +{ + // Empty initialize executable list + exec_list_ = rclcpp::experimental::ExecutableList(); + // Get executor's wait_set_ pointer + p_wait_set_ = p_wait_set; + // Get executor's memory strategy ptr + if (memory_strategy == nullptr) { + throw std::runtime_error("Received NULL memory strategy in executor waitable."); + } + memory_strategy_ = memory_strategy; + + // Get memory strategy and executable list. Prepare wait_set_ + std::shared_ptr shared_ptr; + execute(shared_ptr); + + // The entities collector is now initialized + initialized_ = true; +} + +void +StaticExecutorEntitiesCollector::fini() +{ + memory_strategy_->clear_handles(); + exec_list_.clear(); +} + +std::shared_ptr +StaticExecutorEntitiesCollector::take_data() +{ + return nullptr; +} + +void +StaticExecutorEntitiesCollector::execute(std::shared_ptr & data) +{ + (void) data; + // Fill memory strategy with entities coming from weak_nodes_ + fill_memory_strategy(); + // Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy) + fill_executable_list(); + // Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize) + prepare_wait_set(); + // Add new nodes guard conditions to map + std::lock_guard guard{new_nodes_mutex_}; + for (const auto & weak_node : new_nodes_) { + if (auto node_ptr = weak_node.lock()) { + const auto & gc = node_ptr->get_notify_guard_condition(); + weak_nodes_to_guard_conditions_[node_ptr] = &gc; + } + } + new_nodes_.clear(); +} + +void +StaticExecutorEntitiesCollector::fill_memory_strategy() +{ + memory_strategy_->clear_handles(); + bool has_invalid_weak_groups_or_nodes = + memory_strategy_->collect_entities(weak_groups_to_nodes_associated_with_executor_); + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_groups_or_nodes) { + std::vector invalid_group_ptrs; + for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto & weak_group_ptr = pair.first; + auto & weak_node_ptr = pair.second; + if (weak_group_ptr.expired() || weak_node_ptr.expired()) { + invalid_group_ptrs.push_back(weak_group_ptr); + } + } + std::for_each( + invalid_group_ptrs.begin(), invalid_group_ptrs.end(), + [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { + weak_groups_to_nodes_associated_with_executor_.erase(group_ptr); + }); + } + has_invalid_weak_groups_or_nodes = + memory_strategy_->collect_entities(weak_groups_associated_with_executor_to_nodes_); + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_groups_or_nodes) { + std::vector invalid_group_ptrs; + for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto & weak_group_ptr = pair.first; + const auto & weak_node_ptr = pair.second; + if (weak_group_ptr.expired() || weak_node_ptr.expired()) { + invalid_group_ptrs.push_back(weak_group_ptr); + } + } + std::for_each( + invalid_group_ptrs.begin(), invalid_group_ptrs.end(), + [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { + weak_groups_associated_with_executor_to_nodes_.erase(group_ptr); + }); + } + + // Add the static executor waitable to the memory strategy + memory_strategy_->add_waitable_handle(this->shared_from_this()); +} + +void +StaticExecutorEntitiesCollector::fill_executable_list() +{ + exec_list_.clear(); + add_callback_groups_from_nodes_associated_to_executor(); + fill_executable_list_from_map(weak_groups_associated_with_executor_to_nodes_); + fill_executable_list_from_map(weak_groups_to_nodes_associated_with_executor_); + // Add the executor's waitable to the executable list + exec_list_.add_waitable(shared_from_this()); +} +void +StaticExecutorEntitiesCollector::fill_executable_list_from_map( + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes) +{ + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!node || !group || !group->can_be_taken_from().load()) { + continue; + } + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + exec_list_.add_timer(timer); + } + return false; + }); + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + exec_list_.add_subscription(subscription); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + exec_list_.add_service(service); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + exec_list_.add_client(client); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + exec_list_.add_waitable(waitable); + } + return false; + }); + } +} + +void +StaticExecutorEntitiesCollector::prepare_wait_set() +{ + // clear wait set + if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + + // The size of waitables are accounted for in size of the other entities + rcl_ret_t ret = rcl_wait_set_resize( + p_wait_set_, memory_strategy_->number_of_ready_subscriptions(), + memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); + + if (RCL_RET_OK != ret) { + throw std::runtime_error( + std::string("Couldn't resize the wait set: ") + rcl_get_error_string().str); + } +} + +void +StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout) +{ + // clear wait set (memset to '0' all wait_set_ entities + // but keeps the wait_set_ number of entities) + if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + + if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) { + throw std::runtime_error("Couldn't fill wait set"); + } + + rcl_ret_t status = + rcl_wait(p_wait_set_, std::chrono::duration_cast(timeout).count()); + + if (status == RCL_RET_WAIT_SET_EMPTY) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in rcl_wait(). This should never happen."); + } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(status, "rcl_wait() failed"); + } +} + +void +StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + // Add waitable guard conditions (one for each registered node) into the wait set. + for (const auto & pair : weak_nodes_to_guard_conditions_) { + auto & gc = pair.second; + detail::add_guard_condition_to_rcl_wait_set(*wait_set, *gc); + } +} + +size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions() +{ + std::lock_guard guard{new_nodes_mutex_}; + return weak_nodes_to_guard_conditions_.size() + new_nodes_.size(); +} + +bool +StaticExecutorEntitiesCollector::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + bool is_new_node = false; + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } + node_ptr->for_each_callback_group( + [this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + if ( + !group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + is_new_node = (add_callback_group( + group_ptr, + node_ptr, + weak_groups_to_nodes_associated_with_executor_) || + is_new_node); + } + }); + weak_nodes_.push_back(node_ptr); + return is_new_node; +} + +bool +StaticExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + // If the callback_group already has an executor + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto insert_info = weak_groups_to_nodes.insert( + std::make_pair(weak_group_ptr, node_ptr)); + bool was_inserted = insert_info.second; + if (!was_inserted) { + throw std::runtime_error("Callback group was already added to executor."); + } + if (is_new_node) { + std::lock_guard guard{new_nodes_mutex_}; + new_nodes_.push_back(node_ptr); + return true; + } + return false; +} + +bool +StaticExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_); +} + +bool +StaticExecutorEntitiesCollector::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + return this->remove_callback_group_from_map( + group_ptr, + weak_groups_associated_with_executor_to_nodes_); +} + +bool +StaticExecutorEntitiesCollector::remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto iter = weak_groups_to_nodes.find(weak_group_ptr); + if (iter != weak_groups_to_nodes.end()) { + node_ptr = iter->second.lock(); + if (node_ptr == nullptr) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + weak_groups_to_nodes.erase(iter); + } else { + throw std::runtime_error("Callback group needs to be associated with executor."); + } + // If the node was matched and removed, interrupt waiting. + if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_)) + { + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr); + weak_nodes_to_guard_conditions_.erase(node_weak_ptr); + return true; + } + return false; +} + +bool +StaticExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + if (!node_ptr->get_associated_with_executor_atomic().load()) { + return false; + } + bool node_found = false; + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + weak_nodes_.erase(node_it); + node_found = true; + break; + } + ++node_it; + } + if (!node_found) { + return false; + } + std::vector found_group_ptrs; + std::for_each( + weak_groups_to_nodes_associated_with_executor_.begin(), + weak_groups_to_nodes_associated_with_executor_.end(), + [&found_group_ptrs, node_ptr](std::pair key_value_pair) { + auto & weak_node_ptr = key_value_pair.second; + auto shared_node_ptr = weak_node_ptr.lock(); + auto group_ptr = key_value_pair.first.lock(); + if (shared_node_ptr == node_ptr) { + found_group_ptrs.push_back(group_ptr); + } + }); + std::for_each( + found_group_ptrs.begin(), found_group_ptrs.end(), [this] + (rclcpp::CallbackGroup::SharedPtr group_ptr) { + this->remove_callback_group_from_map( + group_ptr, + weak_groups_to_nodes_associated_with_executor_); + }); + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + return true; +} + +bool +StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) +{ + // Check wait_set guard_conditions for added/removed entities to/from a node + for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) { + if (p_wait_set->guard_conditions[i] != NULL) { + auto found_guard_condition = std::find_if( + weak_nodes_to_guard_conditions_.begin(), weak_nodes_to_guard_conditions_.end(), + [&](std::pair pair) -> bool { + const rcl_guard_condition_t & rcl_gc = pair.second->get_rcl_guard_condition(); + return &rcl_gc == p_wait_set->guard_conditions[i]; + }); + if (found_guard_condition != weak_nodes_to_guard_conditions_.end()) { + return true; + } + } + } + // None of the guard conditions triggered belong to a registered node + return false; +} + +// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. +bool +StaticExecutorEntitiesCollector::has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes) const +{ + return std::find_if( + weak_groups_to_nodes.begin(), + weak_groups_to_nodes.end(), + [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { + auto other_ptr = other.second.lock(); + return other_ptr == node_ptr; + }) != weak_groups_to_nodes.end(); +} + +void +StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor() +{ + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + node->for_each_callback_group( + [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) + { + if (shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) + { + add_callback_group( + shared_group_ptr, + node, + weak_groups_to_nodes_associated_with_executor_); + } + }); + } + } +} + +std::vector +StaticExecutorEntitiesCollector::get_all_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +StaticExecutorEntitiesCollector::get_manually_added_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +StaticExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp new file mode 100644 index 0000000000..209fcde556 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -0,0 +1,280 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/static_single_threaded_executor.hpp" + +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" + +using rclcpp::executors::StaticSingleThreadedExecutor; +using rclcpp::experimental::ExecutableList; + +StaticSingleThreadedExecutor::StaticSingleThreadedExecutor( + const rclcpp::ExecutorOptions & options) +: rclcpp::Executor(options) +{ + entities_collector_ = std::make_shared(); +} + +StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() +{ + if (entities_collector_->is_init()) { + entities_collector_->fini(); + } +} + +void +StaticSingleThreadedExecutor::spin() +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + // Set memory_strategy_ and exec_list_ based on weak_nodes_ + // Prepare wait_set_ based on memory_strategy_ + entities_collector_->init(&wait_set_, memory_strategy_); + + while (rclcpp::ok(this->context_) && spinning.load()) { + // Refresh wait set and wait for work + entities_collector_->refresh_wait_set(); + execute_ready_executables(); + } +} + +void +StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration) +{ + // In this context a 0 input max_duration means no duration limit + if (std::chrono::nanoseconds(0) == max_duration) { + max_duration = std::chrono::nanoseconds::max(); + } + + return this->spin_some_impl(max_duration, false); +} + +void +StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration) +{ + if (max_duration < std::chrono::nanoseconds(0)) { + throw std::invalid_argument("max_duration must be greater than or equal to 0"); + } + return this->spin_some_impl(max_duration, true); +} + +void +StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) +{ + // Make sure the entities collector has been initialized + if (!entities_collector_->is_init()) { + entities_collector_->init(&wait_set_, memory_strategy_); + } + + auto start = std::chrono::steady_clock::now(); + auto max_duration_not_elapsed = [max_duration, start]() { + if (std::chrono::nanoseconds(0) == max_duration) { + // told to spin forever if need be + return true; + } else if (std::chrono::steady_clock::now() - start < max_duration) { + // told to spin only for some maximum amount of time + return true; + } + // spun too long + return false; + }; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_some() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { + // Get executables that are ready now + entities_collector_->refresh_wait_set(std::chrono::milliseconds::zero()); + // Execute ready executables + bool work_available = execute_ready_executables(); + if (!work_available || !exhaustive) { + break; + } + } +} + +void +StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) +{ + // Make sure the entities collector has been initialized + if (!entities_collector_->is_init()) { + entities_collector_->init(&wait_set_, memory_strategy_); + } + + if (rclcpp::ok(context_) && spinning.load()) { + // Wait until we have a ready entity or timeout expired + entities_collector_->refresh_wait_set(timeout); + // Execute ready executables + execute_ready_executables(true); + } +} + +void +StaticSingleThreadedExecutor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) +{ + bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr); + if (is_new_node && notify) { + // Interrupt waiting to handle new node + interrupt_guard_condition_.trigger(); + } +} + +void +StaticSingleThreadedExecutor::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + bool is_new_node = entities_collector_->add_node(node_ptr); + if (is_new_node && notify) { + // Interrupt waiting to handle new node + interrupt_guard_condition_.trigger(); + } +} + +void +StaticSingleThreadedExecutor::add_node(std::shared_ptr node_ptr, bool notify) +{ + this->add_node(node_ptr->get_node_base_interface(), notify); +} + +void +StaticSingleThreadedExecutor::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) +{ + bool node_removed = entities_collector_->remove_callback_group(group_ptr); + // If the node was matched and removed, interrupt waiting + if (node_removed && notify) { + interrupt_guard_condition_.trigger(); + } +} + +void +StaticSingleThreadedExecutor::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + bool node_removed = entities_collector_->remove_node(node_ptr); + if (!node_removed) { + throw std::runtime_error("Node needs to be associated with this executor."); + } + // If the node was matched and removed, interrupt waiting + if (notify) { + interrupt_guard_condition_.trigger(); + } +} + +std::vector +StaticSingleThreadedExecutor::get_all_callback_groups() +{ + return entities_collector_->get_all_callback_groups(); +} + +std::vector +StaticSingleThreadedExecutor::get_manually_added_callback_groups() +{ + return entities_collector_->get_manually_added_callback_groups(); +} + +std::vector +StaticSingleThreadedExecutor::get_automatically_added_callback_groups_from_nodes() +{ + return entities_collector_->get_automatically_added_callback_groups_from_nodes(); +} + +void +StaticSingleThreadedExecutor::remove_node(std::shared_ptr node_ptr, bool notify) +{ + this->remove_node(node_ptr->get_node_base_interface(), notify); +} + +bool +StaticSingleThreadedExecutor::execute_ready_executables(bool spin_once) +{ + bool any_ready_executable = false; + + // Execute all the ready subscriptions + for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { + if (i < entities_collector_->get_number_of_subscriptions()) { + if (wait_set_.subscriptions[i]) { + execute_subscription(entities_collector_->get_subscription(i)); + if (spin_once) { + return true; + } + any_ready_executable = true; + } + } + } + // Execute all the ready timers + for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { + if (i < entities_collector_->get_number_of_timers()) { + if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) { + auto timer = entities_collector_->get_timer(i); + timer->call(); + execute_timer(std::move(timer)); + if (spin_once) { + return true; + } + any_ready_executable = true; + } + } + } + // Execute all the ready services + for (size_t i = 0; i < wait_set_.size_of_services; ++i) { + if (i < entities_collector_->get_number_of_services()) { + if (wait_set_.services[i]) { + execute_service(entities_collector_->get_service(i)); + if (spin_once) { + return true; + } + any_ready_executable = true; + } + } + } + // Execute all the ready clients + for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { + if (i < entities_collector_->get_number_of_clients()) { + if (wait_set_.clients[i]) { + execute_client(entities_collector_->get_client(i)); + if (spin_once) { + return true; + } + any_ready_executable = true; + } + } + } + // Execute all the ready waitables + for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) { + auto waitable = entities_collector_->get_waitable(i); + if (waitable->is_ready(&wait_set_)) { + auto data = waitable->take_data(); + waitable->execute(data); + if (spin_once) { + return true; + } + any_ready_executable = true; + } + } + return any_ready_executable; +}