diff --git a/rclpy/rclpy/qos_event.py b/rclpy/rclpy/qos_event.py index 7a4ff80e9..ff48fc457 100644 --- a/rclpy/rclpy/qos_event.py +++ b/rclpy/rclpy/qos_event.py @@ -22,6 +22,8 @@ from rclpy.callback_groups import CallbackGroup from rclpy.handle import Handle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.logging import get_logger +from rclpy.qos import qos_policy_name_from_kind from rclpy.waitable import NumberOfEntities from rclpy.waitable import Waitable @@ -186,6 +188,7 @@ def __init__( deadline: Optional[Callable[[QoSRequestedDeadlineMissedInfo], None]] = None, liveliness: Optional[Callable[[QoSLivelinessChangedInfo], None]] = None, incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None, + use_default_callbacks: bool = True, ) -> None: """ Create a SubscriptionEventCallbacks container. @@ -194,14 +197,22 @@ def __init__( requested Deadline. :param liveliness: A user-defined callback that is called when the Liveliness of a Publisher on subscribed topic changes. + :param incompatible_qos: A user-defined callback that is called when a Publisher + with incompatible QoS policies is discovered on subscribed topic. + :param use_default_callbacks: Whether or not to use default callbacks when the user + doesn't supply one """ self.deadline = deadline self.liveliness = liveliness self.incompatible_qos = incompatible_qos + self.use_default_callbacks = use_default_callbacks def create_event_handlers( self, callback_group: CallbackGroup, subscription_handle: Handle, ) -> List[QoSEventHandler]: + with subscription_handle as subscription_capsule: + logger = get_logger(_rclpy.rclpy_get_subscription_logger_name(subscription_capsule)) + event_handlers = [] if self.deadline: event_handlers.append(QoSEventHandler( @@ -209,18 +220,44 @@ def create_event_handlers( callback=self.deadline, event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED, parent_handle=subscription_handle)) + if self.liveliness: event_handlers.append(QoSEventHandler( callback_group=callback_group, callback=self.liveliness, event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED, parent_handle=subscription_handle)) + if self.incompatible_qos: event_handlers.append(QoSEventHandler( callback_group=callback_group, callback=self.incompatible_qos, event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS, parent_handle=subscription_handle)) + elif self.use_default_callbacks: + # Register default callback when not specified + try: + def _default_incompatible_qos_callback(event): + policy_name = qos_policy_name_from_kind(event.last_policy_kind) + logger.warn( + 'New publisher discovered on this topic, offering incompatible QoS. ' + 'No messages will be received from it. ' + 'Last incompatible policy: {}'.format(policy_name)) + + event_type = QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=_default_incompatible_qos_callback, + event_type=event_type, + parent_handle=subscription_handle)) + + except UnsupportedEventTypeError: + logger.warn( + 'This rmw implementation does not support ON_REQUESTED_INCOMPATIBLE_QOS ' + 'events, you will not be notified when Subscriptions request an incompatible ' + 'QoS profile from Publishers on the same topic.', + once=True) + return event_handlers @@ -232,7 +269,8 @@ def __init__( *, deadline: Optional[Callable[[QoSOfferedDeadlineMissedInfo], None]] = None, liveliness: Optional[Callable[[QoSLivelinessLostInfo], None]] = None, - incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None + incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None, + use_default_callbacks: bool = True, ) -> None: """ Create and return a PublisherEventCallbacks container. @@ -241,14 +279,22 @@ def __init__( its offered Deadline. :param liveliness: A user-defined callback that is called when this Publisher fails to signal its Liveliness and is reported as not-alive. + :param incompatible_qos: A user-defined callback that is called when a Subscription + with incompatible QoS policies is discovered on subscribed topic. + :param use_default_callbacks: Whether or not to use default callbacks when the user + doesn't supply one """ self.deadline = deadline self.liveliness = liveliness self.incompatible_qos = incompatible_qos + self.use_default_callbacks = use_default_callbacks def create_event_handlers( self, callback_group: CallbackGroup, publisher_handle: Handle, ) -> List[QoSEventHandler]: + with publisher_handle as publisher_capsule: + logger = get_logger(_rclpy.rclpy_get_publisher_logger_name(publisher_capsule)) + event_handlers = [] if self.deadline: event_handlers.append(QoSEventHandler( @@ -256,16 +302,41 @@ def create_event_handlers( callback=self.deadline, event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED, parent_handle=publisher_handle)) + if self.liveliness: event_handlers.append(QoSEventHandler( callback_group=callback_group, callback=self.liveliness, event_type=QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST, parent_handle=publisher_handle)) + if self.incompatible_qos: event_handlers.append(QoSEventHandler( callback_group=callback_group, callback=self.incompatible_qos, event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS, parent_handle=publisher_handle)) + elif self.use_default_callbacks: + # Register default callback when not specified + try: + def _default_incompatible_qos_callback(event): + policy_name = qos_policy_name_from_kind(event.last_policy_kind) + logger.warn( + 'New subscription discovered on this topic, requesting incompatible QoS. ' + 'No messages will be sent to it. ' + 'Last incompatible policy: {}'.format(policy_name)) + + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=_default_incompatible_qos_callback, + event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS, + parent_handle=publisher_handle)) + + except UnsupportedEventTypeError: + logger.warn( + 'This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS ' + 'events, you will not be notified when Publishers offer an incompatible ' + 'QoS profile to Subscriptions on the same topic.', + once=True) + return event_handlers diff --git a/rclpy/src/rclpy/_rclpy.c b/rclpy/src/rclpy/_rclpy.c index be0c813bb..4dcc8e20c 100644 --- a/rclpy/src/rclpy/_rclpy.c +++ b/rclpy/src/rclpy/_rclpy.c @@ -866,6 +866,65 @@ rclpy_get_node_logger_name(PyObject * Py_UNUSED(self), PyObject * args) return PyUnicode_FromString(node_logger_name); } +/// Get the name of the logger associated with the node of the publisher. +/** + * Raises ValueError if pypublisher is not a publisher capsule + * + * \param[in] pypublisher Capsule pointing to the publisher to get the logger name of + * \return logger_name, or + * \return None on failure + */ +static PyObject * +rclpy_get_publisher_logger_name(PyObject * Py_UNUSED(self), PyObject * args) +{ + PyObject * pypublisher; + if (!PyArg_ParseTuple(args, "O", &pypublisher)) { + return NULL; + } + + rclpy_publisher_t * pub = rclpy_handle_get_pointer_from_capsule(pypublisher, "rclpy_publisher_t"); + if (NULL == pub) { + return NULL; + } + + const char * node_logger_name = rcl_node_get_logger_name(pub->node); + if (NULL == node_logger_name) { + Py_RETURN_NONE; + } + + return PyUnicode_FromString(node_logger_name); +} + +/// Get the name of the logger associated with the node of the subscription. +/** + * Raises ValueError if pysubscription is not a subscription capsule + * + * \param[in] pysubscription Capsule pointing to the subscription to get the logger name of + * \return logger_name, or + * \return None on failure + */ +static PyObject * +rclpy_get_subscription_logger_name(PyObject * Py_UNUSED(self), PyObject * args) +{ + PyObject * pysubscription; + if (!PyArg_ParseTuple(args, "O", &pysubscription)) { + return NULL; + } + + rclpy_subscription_t * sub = + rclpy_handle_get_pointer_from_capsule(pysubscription, "rclpy_subscription_t"); + if (NULL == sub) { + return NULL; + } + + const char * node_logger_name = rcl_node_get_logger_name(sub->node); + if (NULL == node_logger_name) { + Py_RETURN_NONE; + } + + return PyUnicode_FromString(node_logger_name); +} + typedef rcl_ret_t (* count_func)(const rcl_node_t * node, const char * topic_name, size_t * count); static PyObject * @@ -5218,6 +5277,14 @@ static PyMethodDef rclpy_methods[] = { "rclpy_get_node_logger_name", rclpy_get_node_logger_name, METH_VARARGS, "Get the logger name associated with a node." }, + { + "rclpy_get_publisher_logger_name", rclpy_get_publisher_logger_name, METH_VARARGS, + "Get the logger name associated with the node of a publisher." + }, + { + "rclpy_get_subscription_logger_name", rclpy_get_subscription_logger_name, METH_VARARGS, + "Get the logger name associated with the node of a subscription." + }, { "rclpy_count_publishers", rclpy_count_publishers, METH_VARARGS, "Count publishers for a topic." diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py index dcde88c36..27f31940d 100644 --- a/rclpy/test/test_qos_event.py +++ b/rclpy/test/test_qos_event.py @@ -18,7 +18,9 @@ import rclpy from rclpy.handle import Handle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.qos import QoSDurabilityPolicy from rclpy.qos import QoSPolicyKind +from rclpy.qos import QoSProfile from rclpy.qos_event import PublisherEventCallbacks from rclpy.qos_event import QoSLivelinessChangedInfo from rclpy.qos_event import QoSLivelinessLostInfo @@ -30,6 +32,7 @@ from rclpy.qos_event import QoSSubscriptionEventType from rclpy.qos_event import SubscriptionEventCallbacks from rclpy.qos_event import UnsupportedEventTypeError +from rclpy.task import Future from rclpy.utilities import get_rmw_implementation_identifier from test_msgs.msg import Empty as EmptyMsg @@ -37,6 +40,7 @@ class TestQoSEvent(unittest.TestCase): is_fastrtps = False + topic_name = 'test_topic' def setUp(self): self.context = rclpy.context.Context() @@ -55,37 +59,40 @@ def test_publisher_constructor(self): liveliness_callback = Mock() deadline_callback = Mock() incompatible_qos_callback = Mock() + expected_num_event_handlers = 0 if self.is_fastrtps else 1 # No arg - publisher = self.node.create_publisher(EmptyMsg, 'test_topic', 10) - self.assertEqual(len(publisher.event_handlers), 0) + publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10) + self.assertEqual(len(publisher.event_handlers), expected_num_event_handlers) self.node.destroy_publisher(publisher) # Arg with no callbacks publisher = self.node.create_publisher( - EmptyMsg, 'test_topic', 10, event_callbacks=callbacks) - self.assertEqual(len(publisher.event_handlers), 0) + EmptyMsg, self.topic_name, 10, event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), expected_num_event_handlers) self.node.destroy_publisher(publisher) # Arg with one of the callbacks callbacks.deadline = deadline_callback + expected_num_event_handlers += 1 publisher = self.node.create_publisher( - EmptyMsg, 'test_topic', 10, event_callbacks=callbacks) - self.assertEqual(len(publisher.event_handlers), 1) + EmptyMsg, self.topic_name, 10, event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), expected_num_event_handlers) self.node.destroy_publisher(publisher) # Arg with two callbacks callbacks.liveliness = liveliness_callback + expected_num_event_handlers += 1 publisher = self.node.create_publisher( - EmptyMsg, 'test_topic', 10, event_callbacks=callbacks) - self.assertEqual(len(publisher.event_handlers), 2) + EmptyMsg, self.topic_name, 10, event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), expected_num_event_handlers) self.node.destroy_publisher(publisher) # Arg with three callbacks callbacks.incompatible_qos = incompatible_qos_callback try: publisher = self.node.create_publisher( - EmptyMsg, 'test_topic', 10, event_callbacks=callbacks) + EmptyMsg, self.topic_name, 10, event_callbacks=callbacks) self.assertEqual(len(publisher.event_handlers), 3) self.node.destroy_publisher(publisher) except UnsupportedEventTypeError: @@ -97,42 +104,98 @@ def test_subscription_constructor(self): deadline_callback = Mock() message_callback = Mock() incompatible_qos_callback = Mock() + expected_num_event_handlers = 0 if self.is_fastrtps else 1 # No arg - subscription = self.node.create_subscription(EmptyMsg, 'test_topic', message_callback, 10) - self.assertEqual(len(subscription.event_handlers), 0) + subscription = self.node.create_subscription( + EmptyMsg, self.topic_name, message_callback, 10) + self.assertEqual(len(subscription.event_handlers), expected_num_event_handlers) self.node.destroy_subscription(subscription) # Arg with no callbacks subscription = self.node.create_subscription( - EmptyMsg, 'test_topic', message_callback, 10, event_callbacks=callbacks) - self.assertEqual(len(subscription.event_handlers), 0) + EmptyMsg, self.topic_name, message_callback, 10, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), expected_num_event_handlers) self.node.destroy_subscription(subscription) # Arg with one of the callbacks callbacks.deadline = deadline_callback + expected_num_event_handlers += 1 subscription = self.node.create_subscription( - EmptyMsg, 'test_topic', message_callback, 10, event_callbacks=callbacks) - self.assertEqual(len(subscription.event_handlers), 1) + EmptyMsg, self.topic_name, message_callback, 10, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), expected_num_event_handlers) self.node.destroy_subscription(subscription) # Arg with two callbacks callbacks.liveliness = liveliness_callback + expected_num_event_handlers += 1 subscription = self.node.create_subscription( - EmptyMsg, 'test_topic', message_callback, 10, event_callbacks=callbacks) - self.assertEqual(len(subscription.event_handlers), 2) + EmptyMsg, self.topic_name, message_callback, 10, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), expected_num_event_handlers) self.node.destroy_subscription(subscription) # Arg with three callbacks callbacks.incompatible_qos = incompatible_qos_callback try: subscription = self.node.create_subscription( - EmptyMsg, 'test_topic', message_callback, 10, event_callbacks=callbacks) + EmptyMsg, self.topic_name, message_callback, 10, event_callbacks=callbacks) self.assertEqual(len(subscription.event_handlers), 3) self.node.destroy_subscription(subscription) except UnsupportedEventTypeError: self.assertTrue(self.is_fastrtps) + def test_default_incompatible_qos_callbacks(self): + original_logger = rclpy.logging._root_logger + + pub_log_msg = None + sub_log_msg = None + log_msgs_future = Future() + + class MockLogger: + + def get_child(self, name): + return self + + def warn(self, message, once=False): + nonlocal pub_log_msg, sub_log_msg, log_msgs_future + + if message.startswith('New subscription discovered'): + pub_log_msg = message + elif message.startswith('New publisher discovered'): + sub_log_msg = message + + if pub_log_msg is not None and sub_log_msg is not None: + log_msgs_future.set_result(True) + + rclpy.logging._root_logger = MockLogger() + + qos_profile_publisher = QoSProfile( + depth=10, durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE) + self.node.create_publisher(EmptyMsg, self.topic_name, qos_profile_publisher) + + message_callback = Mock() + qos_profile_subscription = QoSProfile( + depth=10, durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) + self.node.create_subscription( + EmptyMsg, self.topic_name, message_callback, qos_profile_subscription) + + executor = rclpy.executors.SingleThreadedExecutor(context=self.context) + rclpy.spin_until_future_complete(self.node, log_msgs_future, executor, 10.0) + + if not self.is_fastrtps: + self.assertEqual( + pub_log_msg, + 'New subscription discovered on this topic, requesting incompatible QoS. ' + 'No messages will be sent to it. ' + 'Last incompatible policy: DURABILITY_QOS_POLICY') + self.assertEqual( + sub_log_msg, + 'New publisher discovered on this topic, offering incompatible QoS. ' + 'No messages will be received from it. ' + 'Last incompatible policy: DURABILITY_QOS_POLICY') + + rclpy.logging._root_logger = original_logger + def _create_event_handle(self, parent_entity, event_type): with parent_entity.handle as parent_capsule: event_capsule = _rclpy.rclpy_create_event(event_type, parent_capsule) @@ -144,7 +207,7 @@ def _do_create_destroy(self, parent_entity, event_type): handle.destroy() def test_publisher_event_create_destroy(self): - publisher = self.node.create_publisher(EmptyMsg, 'test_topic', 10) + publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10) self._do_create_destroy( publisher, QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED) self._do_create_destroy( @@ -158,7 +221,8 @@ def test_publisher_event_create_destroy(self): def test_subscription_event_create_destroy(self): message_callback = Mock() - subscription = self.node.create_subscription(EmptyMsg, 'test_topic', message_callback, 10) + subscription = self.node.create_subscription( + EmptyMsg, self.topic_name, message_callback, 10) self._do_create_destroy( subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED) self._do_create_destroy( @@ -173,7 +237,7 @@ def test_subscription_event_create_destroy(self): def test_call_publisher_rclpy_event_apis(self): # Go through the exposed apis and ensure that things don't explode when called # Make no assumptions about being able to actually receive the events - publisher = self.node.create_publisher(EmptyMsg, 'test_topic', 10) + publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10) wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() with self.context.handle as context_handle: _rclpy.rclpy_wait_set_init(wait_set, 0, 0, 0, 0, 0, 3, context_handle) @@ -255,7 +319,7 @@ def test_call_publisher_rclpy_event_apis(self): def test_call_subscription_rclpy_event_apis(self): # Go through the exposed apis and ensure that things don't explode when called # Make no assumptions about being able to actually receive the events - subscription = self.node.create_subscription(EmptyMsg, 'test_topic', Mock(), 10) + subscription = self.node.create_subscription(EmptyMsg, self.topic_name, Mock(), 10) wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() with self.context.handle as context_handle: _rclpy.rclpy_wait_set_init(wait_set, 0, 0, 0, 0, 0, 3, context_handle)