Skip to content

Commit

Permalink
Add pending queue to collector, remove from waitable
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carroll <mjcarroll@intrinsic.ai>
  • Loading branch information
mjcarroll committed Apr 3, 2023
1 parent 653d1a3 commit a708084
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 111 deletions.
43 changes: 24 additions & 19 deletions rclcpp/include/rclcpp/executors/executor_entities_collector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class ExecutorEntitiesCollector
RCLCPP_PUBLIC
~ExecutorEntitiesCollector();

bool has_pending();

/// Add a node to the entity collector
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
Expand Down Expand Up @@ -164,63 +166,66 @@ class ExecutorEntitiesCollector

using WeakNodesToGuardConditionsMap = std::map<
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>;

using WeakGroupsToGuardConditionsMap = std::map<
rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>;

RCLCPP_PUBLIC
NodeCollection::iterator
remove_weak_node(NodeCollection::iterator weak_node) RCPPUTILS_TSA_REQUIRES(mutex_);
remove_weak_node(NodeCollection::iterator weak_node);

RCLCPP_PUBLIC
CallbackGroupCollection::iterator
remove_weak_callback_group(
CallbackGroupCollection::iterator weak_group_it,
CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_);
CallbackGroupCollection & collection);

RCLCPP_PUBLIC
void
add_callback_group_to_collection(
rclcpp::CallbackGroup::SharedPtr group_ptr,
CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_);
CallbackGroupCollection & collection);

RCLCPP_PUBLIC
void
remove_callback_group_from_collection(
rclcpp::CallbackGroup::SharedPtr group_ptr,
CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_);
CallbackGroupCollection & collection);

RCLCPP_PUBLIC
void
add_automatically_associated_callback_groups(
const NodeCollection & nodes_to_check)
RCPPUTILS_TSA_REQUIRES(mutex_);
process_queues();

RCLCPP_PUBLIC
void
prune_invalid_nodes_and_groups() RCPPUTILS_TSA_REQUIRES(mutex_);
add_automatically_associated_callback_groups(
const NodeCollection & nodes_to_check);

mutable std::mutex mutex_;
RCLCPP_PUBLIC
void
prune_invalid_nodes_and_groups();

/// Callback groups that were added via `add_callback_group`
CallbackGroupCollection
manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
CallbackGroupCollection manually_added_groups_;

/// Callback groups that were added by their association with added nodes
CallbackGroupCollection
automatically_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
CallbackGroupCollection automatically_added_groups_;

/// nodes that are associated with the executor
NodeCollection
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
NodeCollection weak_nodes_;

WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::mutex pending_mutex_;
NodeCollection pending_added_nodes_;
NodeCollection pending_removed_nodes_;
CallbackGroupCollection pending_manually_added_groups_;
CallbackGroupCollection pending_manually_removed_groups_;

WeakGroupsToGuardConditionsMap weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
WeakGroupsToGuardConditionsMap weak_groups_to_guard_conditions_;

std::shared_ptr<ExecutorNotifyWaitable> notify_waitable_;
};
Expand Down
23 changes: 13 additions & 10 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <list>
#include <memory>
#include <set>

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -33,6 +34,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
public:
RCLCPP_SMART_PTR_DEFINITIONS(ExecutorNotifyWaitable)


// Constructor
/**
* \param[in] on_execute_callback Callback to execute when one of the conditions
Expand All @@ -45,6 +47,13 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
RCLCPP_PUBLIC
~ExecutorNotifyWaitable() override = default;

RCLCPP_PUBLIC
ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other);


RCLCPP_PUBLIC
ExecutorNotifyWaitable& operator=(const ExecutorNotifyWaitable & other );

/// Add conditions to the wait set
/**
* \param[inout] wait_set structure that conditions will be added to
Expand Down Expand Up @@ -85,15 +94,15 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
*/
RCLCPP_PUBLIC
void
add_guard_condition(const rclcpp::GuardCondition * guard_condition);
add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);

/// Remove a guard condition from being waited on.
/**
* \param[in] guard_condition The guard condition to remove.
*/
RCLCPP_PUBLIC
void
remove_guard_condition(const rclcpp::GuardCondition * guard_condition);
remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);

/// Get the number of ready guard_conditions
/**
Expand All @@ -107,17 +116,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
/// Callback to run when waitable executes
std::function<void(void)> execute_callback_;

/// The collection of guard conditions to be waited on.
std::mutex guard_condition_mutex_;

/// The collection of guard conditions to be waited on.
std::list<const rclcpp::GuardCondition *> notify_guard_conditions_;

/// The collection of guard conditions to be waited on.
std::list<const rclcpp::GuardCondition *> to_add_;

/// The collection of guard conditions to be waited on.
std::list<const rclcpp::GuardCondition *> to_remove_;
std::set<rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
};

} // namespace executors
Expand Down
Loading

0 comments on commit a708084

Please sign in to comment.