From 636a4549d4c892dee2e0b88f9dbdcc48092a593d Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Thu, 25 Apr 2019 16:30:06 -0700 Subject: [PATCH] API and implementation for Liveliness and Deadline QoS event callbacks Signed-off-by: Emerson Knapp --- rclpy/rclpy/executors.py | 1 + rclpy/rclpy/node.py | 36 +++- rclpy/rclpy/publisher.py | 11 +- rclpy/rclpy/qos_event.py | 224 +++++++++++++++++++ rclpy/rclpy/subscription.py | 9 +- rclpy/rclpy/waitable.py | 16 +- rclpy/src/rclpy/_rclpy.c | 36 +++- rclpy/src/rclpy/_rclpy_qos_event.c | 334 +++++++++++++++++++++++++++++ rclpy/test/test_qos_event.py | 233 ++++++++++++++++++++ rclpy/test/test_waitable.py | 10 +- 10 files changed, 889 insertions(+), 21 deletions(-) create mode 100644 rclpy/rclpy/qos_event.py create mode 100644 rclpy/src/rclpy/_rclpy_qos_event.c create mode 100644 rclpy/test/test_qos_event.py diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index cd8344628..250ef3026 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -490,6 +490,7 @@ def _wait_for_ready_callbacks( entity_count.num_timers, entity_count.num_clients, entity_count.num_services, + entity_count.num_events, self._context.handle) entities = { diff --git a/rclpy/rclpy/node.py b/rclpy/rclpy/node.py index 3212accb5..209a34f03 100644 --- a/rclpy/rclpy/node.py +++ b/rclpy/rclpy/node.py @@ -43,6 +43,8 @@ from rclpy.qos import qos_profile_parameter_events from rclpy.qos import qos_profile_services_default from rclpy.qos import QoSProfile +from rclpy.qos_event import PublisherEventCallbacks +from rclpy.qos_event import SubscriptionEventCallbacks from rclpy.service import Service from rclpy.subscription import Subscription from rclpy.time_source import TimeSource @@ -349,7 +351,9 @@ def create_publisher( msg_type, topic: str, *, - qos_profile: QoSProfile = qos_profile_default + qos_profile: QoSProfile = qos_profile_default, + callback_group: Optional[CallbackGroup] = None, + event_callbacks: Optional[PublisherEventCallbacks] = None, ) -> Publisher: """ Create a new publisher. @@ -357,8 +361,13 @@ def create_publisher( :param msg_type: The type of ROS messages the publisher will publish. :param topic: The name of the topic the publisher will publish to. :param qos_profile: The quality of service profile to apply to the publisher. + :param callback_group: The callback group for the publisher's event handlers. + If ``None``, then the node's default callback group is used. + :param event_callbacks: User-defined callbacks for middleware events. :return: The new publisher. """ + callback_group = callback_group or self.default_callback_group + # this line imports the typesupport for the message module if not already done check_for_type_support(msg_type) failed = False @@ -369,8 +378,16 @@ def create_publisher( failed = True if failed: self._validate_topic_or_service_name(topic) - publisher = Publisher(publisher_handle, msg_type, topic, qos_profile, self.handle) + + publisher = Publisher( + publisher_handle, msg_type, topic, qos_profile, self.handle, + event_callbacks=event_callbacks or PublisherEventCallbacks(), + callback_group=callback_group) self.publishers.append(publisher) + + for event_callback in publisher.event_handlers: + self.add_waitable(event_callback) + return publisher def create_subscription( @@ -380,7 +397,8 @@ def create_subscription( callback: Callable[[MsgType], None], *, qos_profile: QoSProfile = qos_profile_default, - callback_group: CallbackGroup = None, + callback_group: Optional[CallbackGroup] = None, + event_callbacks: Optional[SubscriptionEventCallbacks] = None, raw: bool = False ) -> Subscription: """ @@ -393,11 +411,12 @@ def create_subscription( :param qos_profile: The quality of service profile to apply to the subscription. :param callback_group: The callback group for the subscription. If ``None``, then the nodes default callback group is used. + :param event_callbacks: User-defined callbacks for middleware events. :param raw: If ``True``, then received messages will be stored in raw binary representation. """ - if callback_group is None: - callback_group = self.default_callback_group + callback_group = callback_group or self.default_callback_group + # this line imports the typesupport for the message module if not already done check_for_type_support(msg_type) failed = False @@ -413,9 +432,14 @@ def create_subscription( subscription = Subscription( subscription_handle, msg_type, - topic, callback, callback_group, qos_profile, self.handle, raw) + topic, callback, callback_group, qos_profile, self.handle, raw, + event_callbacks=event_callbacks or SubscriptionEventCallbacks()) self.subscriptions.append(subscription) callback_group.add_entity(subscription) + + for event_handler in subscription.event_handlers: + self.add_waitable(event_handler) + return subscription def create_client( diff --git a/rclpy/rclpy/publisher.py b/rclpy/rclpy/publisher.py index bef504e96..914ed15c1 100644 --- a/rclpy/rclpy/publisher.py +++ b/rclpy/rclpy/publisher.py @@ -14,8 +14,10 @@ from typing import TypeVar +from rclpy.callback_groups import CallbackGroup from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.qos import QoSProfile +from rclpy.qos_event import PublisherEventCallbacks MsgType = TypeVar('MsgType') @@ -28,7 +30,9 @@ def __init__( msg_type: MsgType, topic: str, qos_profile: QoSProfile, - node_handle + node_handle, + event_callbacks: PublisherEventCallbacks, + callback_group: CallbackGroup, ) -> None: """ Create a container for a ROS publisher. @@ -51,6 +55,11 @@ def __init__( self.topic = topic self.qos_profile = qos_profile self.node_handle = node_handle + self.callback_group = callback_group + + self.event_callbacks = event_callbacks + self.event_handlers = event_callbacks.create_event_handlers( + callback_group, publisher_handle) def publish(self, msg: MsgType) -> None: """ diff --git a/rclpy/rclpy/qos_event.py b/rclpy/rclpy/qos_event.py new file mode 100644 index 000000000..0fce11698 --- /dev/null +++ b/rclpy/rclpy/qos_event.py @@ -0,0 +1,224 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import IntEnum +from typing import Callable +from typing import List +from typing import NamedTuple + +from rclpy.callback_groups import CallbackGroup +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.waitable import NumberOfEntities +from rclpy.waitable import Waitable + + +class QoSPublisherEventType(IntEnum): + """ + Enum for types of QoS events that a Publisher can receive. + + This enum matches the one defined in rcl/event.h + """ + + RCL_PUBLISHER_OFFERED_DEADLINE_MISSED = 0 + RCL_PUBLISHER_LIVELINESS_LOST = 1 + + +class QoSSubscriptionEventType(IntEnum): + """ + Enum for types of QoS events that a Subscription can receive. + + This enum matches the one defined in rcl/event.h + """ + + RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED = 0 + RCL_SUBSCRIPTION_LIVELINESS_CHANGED = 1 + + +""" +Payload type for Subscription Deadline callback. + +Mirrors rmw_requested_deadline_missed_status_t from rmw/types.h +""" +QoSRequestedDeadlineMissedInfo = NamedTuple( + 'QoSRequestedDeadlineMissedInfo', [ + ('total_count', 'int'), + ('total_count_change', 'int'), + ]) + +""" +Payload type for Subscription Liveliness callback. + +Mirrors rmw_liveliness_changed_status_t from rmw/types.h +""" +QoSLivelinessChangedInfo = NamedTuple( + 'QoSLivelinessChangedInfo', [ + ('alive_count', 'int'), + ('not_alive_count', 'int'), + ('alive_count_change', 'int'), + ('not_alive_count_change', 'int'), + ]) + +""" +Payload type for Publisher Deadline callback. + +Mirrors rmw_offered_deadline_missed_status_t from rmw/types.h +""" +QoSOfferedDeadlineMissedInfo = NamedTuple( + 'QoSOfferedDeadlineMissedInfo', [ + ('total_count', 'int'), + ('total_count_change', 'int'), + ]) + +""" +Payload type for Publisher Liveliness callback. + +Mirrors rmw_liveliness_lost_status_t from rmw/types.h +""" +QoSLivelinessLostInfo = NamedTuple( + 'QoSLivelinessLostInfo', [ + ('total_count', 'int'), + ('total_count_change', 'int'), + ]) + + +class QoSEventHandler(Waitable): + """Waitable type to handle QoS events.""" + + def __init__( + self, + *, + callback_group, + callback, + event_type, + parent_handle, + ): + # Waitable init adds self to callback_group + super().__init__(callback_group) + self.event_type = event_type + self.callback = callback + + self._parent_handle = parent_handle + self._event_handle = _rclpy.rclpy_create_event(event_type, parent_handle) + self._ready_to_take_data = False + self._event_index = None + + # Start Waitable API + def is_ready(self, wait_set): + """Return True if entities are ready in the wait set.""" + if self._event_index is None: + return False + if _rclpy.rclpy_wait_set_is_ready('event', wait_set, self._event_index): + self._ready_to_take_data = True + return self._ready_to_take_data + + def take_data(self): + """Take stuff from lower level so the wait set doesn't immediately wake again.""" + if self._ready_to_take_data: + self._ready_to_take_data = False + return _rclpy.rclpy_take_event( + self._event_handle, self._parent_handle, self.event_type) + return None + + async def execute(self, taken_data): + """Execute work after data has been taken from a ready wait set.""" + if not taken_data: + return + self.callback(taken_data) + + def get_num_entities(self): + """Return number of each type of entity used.""" + return NumberOfEntities(num_events=1) + + def add_to_wait_set(self, wait_set): + """Add entites to wait set.""" + self._event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, self._event_handle) + # End Waitable API + + +class SubscriptionEventCallbacks: + """Container to provide middleware event callbacks for a Subscription.""" + + def __init__( + self, + *, + deadline: Callable[[QoSRequestedDeadlineMissedInfo], None] = None, + liveliness: Callable[[QoSLivelinessChangedInfo], None] = None, + ) -> None: + """ + Constructor. + + :param deadline: A user-defined callback that is called when a topic misses our + requested Deadline. + :param liveliness: A user-defined callback that is called when the Liveliness of + a Publisher on subscribed topic changes. + """ + self.deadline = deadline + self.liveliness = liveliness + + def create_event_handlers( + self, callback_group: CallbackGroup, subscription_handle + ) -> List[QoSEventHandler]: + event_handlers = [] + if self.deadline: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.deadline, + event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED, + parent_handle=subscription_handle)) + if self.liveliness: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.liveliness, + event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED, + parent_handle=subscription_handle)) + return event_handlers + + +class PublisherEventCallbacks: + """Container to provide middleware event callbacks for a Publisher.""" + + def __init__( + self, + *, + deadline: Callable[[QoSOfferedDeadlineMissedInfo], None] = None, + liveliness: Callable[[QoSLivelinessLostInfo], None] = None + ) -> None: + """ + Constructor. + + :param deadline: A user-defined callback that is called when the Publisher misses + its offered Deadline. + :param liveliness: A user-defined callback that is called when this Publisher + fails to signal its Liveliness and is reported as not-alive. + """ + self.deadline = deadline + self.liveliness = liveliness + + def create_event_handlers( + self, callback_group: CallbackGroup, publisher_handle + ) -> List[QoSEventHandler]: + event_handlers = [] + if self.deadline: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.deadline, + event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED, + parent_handle=publisher_handle)) + if self.liveliness: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.liveliness, + event_type=QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST, + parent_handle=publisher_handle)) + return event_handlers diff --git a/rclpy/rclpy/subscription.py b/rclpy/rclpy/subscription.py index a258f6232..c020c6cc7 100644 --- a/rclpy/rclpy/subscription.py +++ b/rclpy/rclpy/subscription.py @@ -17,6 +17,8 @@ from rclpy.callback_groups import CallbackGroup from rclpy.qos import QoSProfile +from rclpy.qos_event import SubscriptionEventCallbacks + # For documentation only MsgType = TypeVar('MsgType') @@ -33,7 +35,8 @@ def __init__( callback_group: CallbackGroup, qos_profile: QoSProfile, node_handle, - raw: bool + raw: bool, + event_callbacks: SubscriptionEventCallbacks, ) -> None: """ Create a container for a ROS subscription. @@ -66,6 +69,10 @@ def __init__( self.qos_profile = qos_profile self.raw = raw + self.event_callbacks = event_callbacks + self.event_handlers = event_callbacks.create_event_handlers( + callback_group, subscription_handle) + @property def handle(self): return self.__handle diff --git a/rclpy/rclpy/waitable.py b/rclpy/rclpy/waitable.py index 030da2461..a42211ed4 100644 --- a/rclpy/rclpy/waitable.py +++ b/rclpy/rclpy/waitable.py @@ -20,14 +20,19 @@ class NumberOfEntities: 'num_guard_conditions', 'num_timers', 'num_clients', - 'num_services'] + 'num_services', + 'num_events'] - def __init__(self, num_subs=0, num_gcs=0, num_timers=0, num_clients=0, num_services=0): + def __init__( + self, num_subs=0, num_gcs=0, num_timers=0, + num_clients=0, num_services=0, num_events=0 + ): self.num_subscriptions = num_subs self.num_guard_conditions = num_gcs self.num_timers = num_timers self.num_clients = num_clients self.num_services = num_services + self.num_events = num_events def __add__(self, other): result = self.__class__() @@ -38,9 +43,10 @@ def __add__(self, other): return result def __repr__(self): - return '<{0}({1}, {2}, {3}, {4}, {5})>'.format( - self.__class__.__name__, self.num_subscriptions, self.num_guard_conditions, - self.num_timers, self.num_clients, self.num_services) + return '<{0}({1}, {2}, {3}, {4}, {5}, {6})>'.format( + self.__class__.__name__, self.num_subscriptions, + self.num_guard_conditions, self.num_timers, self.num_clients, + self.num_services, self.num_events) class Waitable: diff --git a/rclpy/src/rclpy/_rclpy.c b/rclpy/src/rclpy/_rclpy.c index 2fbc180d3..14f173dc8 100644 --- a/rclpy/src/rclpy/_rclpy.c +++ b/rclpy/src/rclpy/_rclpy.c @@ -37,6 +37,7 @@ #include #include "rclpy_common/common.h" +#include "./_rclpy_qos_event.c" typedef struct { @@ -2315,6 +2316,10 @@ rclpy_destroy_entity(PyObject * Py_UNUSED(self), PyObject * args) pyentity, "rcl_guard_condition_t"); ret = rcl_guard_condition_fini(guard_condition); PyMem_Free(guard_condition); + } else if (PyCapsule_IsValid(pyentity, "rcl_event_t")) { + PyCapsule_SetDestructor(pyentity, NULL); + _destroy_event_capsule(pyentity); + ret = RCL_RET_OK; } else { ret = RCL_RET_ERROR; // to avoid a linter warning PyErr_Format(PyExc_RuntimeError, "'%s' is not a known entity", PyCapsule_GetName(pyentity)); @@ -2387,12 +2392,13 @@ rclpy_wait_set_init(PyObject * Py_UNUSED(self), PyObject * args) unsigned PY_LONG_LONG number_of_timers; unsigned PY_LONG_LONG number_of_clients; unsigned PY_LONG_LONG number_of_services; + unsigned PY_LONG_LONG number_of_events; PyObject * pycontext; if (!PyArg_ParseTuple( - args, "OKKKKKO", &pywait_set, &number_of_subscriptions, + args, "OKKKKKKO", &pywait_set, &number_of_subscriptions, &number_of_guard_conditions, &number_of_timers, - &number_of_clients, &number_of_services, &pycontext)) + &number_of_clients, &number_of_services, &number_of_events, &pycontext)) { return NULL; } @@ -2408,8 +2414,15 @@ rclpy_wait_set_init(PyObject * Py_UNUSED(self), PyObject * args) } rcl_ret_t ret = rcl_wait_set_init( - wait_set, number_of_subscriptions, number_of_guard_conditions, number_of_timers, - number_of_clients, number_of_services, context, rcl_get_default_allocator()); + wait_set, + number_of_subscriptions, + number_of_guard_conditions, + number_of_timers, + number_of_clients, + number_of_services, + number_of_events, + context, + rcl_get_default_allocator()); if (ret != RCL_RET_OK) { PyErr_Format(PyExc_RuntimeError, "Failed to initialize wait set: %s", rcl_get_error_string().str); @@ -2494,6 +2507,9 @@ rclpy_wait_set_add_entity(PyObject * Py_UNUSED(self), PyObject * args) rcl_guard_condition_t * guard_condition = (rcl_guard_condition_t *)PyCapsule_GetPointer(pyentity, "rcl_guard_condition_t"); ret = rcl_wait_set_add_guard_condition(wait_set, guard_condition, &index); + } else if (0 == strcmp(entity_type, "event")) { + rcl_event_t * event = (rcl_event_t *)PyCapsule_GetPointer(pyentity, "rcl_event_t"); + ret = rcl_wait_set_add_event(wait_set, event, &index); } else { ret = RCL_RET_ERROR; // to avoid a linter warning PyErr_Format(PyExc_RuntimeError, @@ -2559,6 +2575,9 @@ rclpy_wait_set_is_ready(PyObject * Py_UNUSED(self), PyObject * args) } else if (0 == strcmp(entity_type, "guard_condition")) { entities = (void *)wait_set->guard_conditions; num_entities = wait_set->size_of_guard_conditions; + } else if (0 == strcmp(entity_type, "event")) { + entities = (void *)wait_set->events; + num_entities = wait_set->size_of_events; } else { PyErr_Format(PyExc_RuntimeError, "'%s' is not a known entity", entity_type); @@ -4528,6 +4547,11 @@ static PyMethodDef rclpy_methods[] = { "rclpy_create_timer", rclpy_create_timer, METH_VARARGS, "Create a Timer." }, + { + "rclpy_create_event", rclpy_create_event, METH_VARARGS, + "Create an Event." + }, + { "rclpy_create_guard_condition", rclpy_create_guard_condition, METH_VARARGS, "Create a general purpose guard_condition." @@ -4663,6 +4687,10 @@ static PyMethodDef rclpy_methods[] = { "rclpy_take_response", rclpy_take_response, METH_VARARGS, "rclpy_take_response." }, + { + "rclpy_take_event", rclpy_take_event, METH_VARARGS, + "Get the pending data for a ready QoS Event." + }, { "rclpy_ok", rclpy_ok, METH_VARARGS, diff --git a/rclpy/src/rclpy/_rclpy_qos_event.c b/rclpy/src/rclpy/_rclpy_qos_event.c new file mode 100644 index 000000000..4eac52666 --- /dev/null +++ b/rclpy/src/rclpy/_rclpy_qos_event.c @@ -0,0 +1,334 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +typedef union _qos_event_callback_data { + // Subscription events + rmw_requested_deadline_missed_status_t requested_deadline_missed; + rmw_liveliness_changed_status_t liveliness_changed; + // Publisher events + rmw_offered_deadline_missed_status_t offered_deadline_missed; + rmw_liveliness_lost_status_t liveliness_lost; +} _qos_event_callback_data_t; + +typedef PyObject * (* _qos_event_data_filler_function)(_qos_event_callback_data_t *); + +bool +_check_rcl_return(rcl_ret_t ret, const char * error_msg) +{ + if (ret != RCL_RET_OK) { + PyErr_Format(PyExc_RuntimeError, + "%s: %s", error_msg, rcl_get_error_string().str); + rcl_reset_error(); + return false; + } + return true; +} + +void +_destroy_event_capsule(PyObject * pycapsule) +{ + rcl_event_t * event = (rcl_event_t *)PyCapsule_GetPointer(pycapsule, "rcl_event_t"); + rcl_ret_t ret = rcl_event_fini(event); + PyMem_Free(event); + _check_rcl_return(ret, "Failed to fini 'rcl_event_t'"); +} + +bool +_is_pycapsule_rcl_subscription(PyObject * pycapsule) +{ + return PyCapsule_IsValid(pycapsule, "rcl_subscription_t"); +} + +bool +_is_pycapsule_rcl_publisher(PyObject * pycapsule) +{ + return PyCapsule_IsValid(pycapsule, "rcl_publisher_t"); +} + +bool +_is_pycapsule_rcl_event(PyObject * pycapsule) +{ + return PyCapsule_IsValid(pycapsule, "rcl_event_t"); +} + +rcl_subscription_t * +_pycapsule_to_rcl_subscription(PyObject * pycapsule) +{ + return (rcl_subscription_t *)PyCapsule_GetPointer(pycapsule, "rcl_subscription_t"); +} + +rcl_publisher_t * +_pycapsule_to_rcl_publisher(PyObject * pycapsule) +{ + return (rcl_publisher_t *)PyCapsule_GetPointer(pycapsule, "rcl_publisher_t"); +} + +rcl_event_t * +_pycapsule_to_rcl_event(PyObject * pycapsule) +{ + return (rcl_event_t *)PyCapsule_GetPointer(pycapsule, "rcl_event_t"); +} + +rcl_event_t * +_new_zero_initialized_rcl_event() +{ + rcl_event_t * event = (rcl_event_t *)PyMem_Malloc(sizeof(rcl_event_t)); + if (!event) { + PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for event"); + return NULL; + } + *event = rcl_get_zero_initialized_event(); + return event; +} + + +PyObject * _create_py_qos_event(const char * class_name, PyObject * args) +{ + PyObject * pyqos_event_module = NULL; + PyObject * pyqos_event_class = NULL; + PyObject * pyqos_event = NULL; + + pyqos_event_module = PyImport_ImportModule("rclpy.qos_event"); + if (!pyqos_event_module) { + goto cleanup; + } + + pyqos_event_class = PyObject_GetAttrString(pyqos_event_module, class_name); + if (!pyqos_event_class) { + goto cleanup; + } + + pyqos_event = PyObject_CallObject(pyqos_event_class, args); + +cleanup: + Py_XDECREF(pyqos_event_module); + Py_XDECREF(pyqos_event_class); + Py_XDECREF(args); + + return pyqos_event; +} + +PyObject * +_requested_deadline_missed_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_requested_deadline_missed_status_t * actual_data = &data->requested_deadline_missed; + PyObject * args = Py_BuildValue( + "ii", + actual_data->total_count, + actual_data->total_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSRequestedDeadlineMissedInfo", args); +} + +PyObject * +_liveliness_changed_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_liveliness_changed_status_t * actual_data = &data->liveliness_changed; + PyObject * args = Py_BuildValue( + "iiii", + actual_data->alive_count, + actual_data->not_alive_count, + actual_data->alive_count_change, + actual_data->not_alive_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSLivelinessChangedInfo", args); +} + +PyObject * +_offered_deadline_missed_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_offered_deadline_missed_status_t * actual_data = &data->offered_deadline_missed; + PyObject * args = Py_BuildValue( + "ii", + actual_data->total_count, + actual_data->total_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSOfferedDeadlineMissedInfo", args); +} + +PyObject * +_liveliness_lost_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_liveliness_lost_status_t * actual_data = &data->liveliness_lost; + PyObject * args = Py_BuildValue( + "ii", + actual_data->total_count, + actual_data->total_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSLivelinessLostInfo", args); +} + + +_qos_event_data_filler_function +_get_qos_event_data_filler_function_for(PyObject * pyparent, uint32_t event_type) +{ + if (_is_pycapsule_rcl_subscription(pyparent)) { + switch (event_type) { + case RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED: + return &_requested_deadline_missed_to_py_object; + case RCL_SUBSCRIPTION_LIVELINESS_CHANGED: + return &_liveliness_changed_to_py_object; + default: + PyErr_Format(PyExc_ValueError, + "Event type %d for Subscriptions not understood by rclpy.", event_type); + } + } else if (_is_pycapsule_rcl_publisher(pyparent)) { + switch (event_type) { + case RCL_PUBLISHER_OFFERED_DEADLINE_MISSED: + return &_offered_deadline_missed_to_py_object; + case RCL_PUBLISHER_LIVELINESS_LOST: + return &_liveliness_lost_to_py_object; + default: + PyErr_Format(PyExc_ValueError, + "Event type %d for Publishers not understood by rclpy.", event_type); + } + } else { + PyErr_Format(PyExc_TypeError, + "Parent handle was not a valid Publisher or Subscription."); + } + return NULL; +} + +/// Create an event object for QoS event handling. +/** + * This function will create an event handle for the given Subscription or Publisher parent. + * + * Raises MemoryError if the event can't be allocated + * Raises RuntimeError on initialization failure + * Raises TypeError if the capsules are not the correct types + * + * \param[in] pyevent_type Enum value of + * rcl_publisher_event_type_t or rcl_subscription_event_type_t, chosen by the type of pyparent + * \param[in] pyparent Capsule containing the parent Publisher or Subscription + * \return capsule containing rcl_event_t + * \return NULL on failure + */ +static PyObject * +rclpy_create_event(PyObject * Py_UNUSED(self), PyObject * args) +{ + unsigned PY_LONG_LONG pyevent_type; + PyObject * pyparent = NULL; + + rcl_ret_t ret; + rcl_subscription_t * subscription = NULL; + rcl_publisher_t * publisher = NULL; + rcl_event_t * event = NULL; + + PyObject * pyevent = NULL; + + if (!PyArg_ParseTuple(args, "KO", &pyevent_type, &pyparent)) { + return NULL; + } + + if (_is_pycapsule_rcl_subscription(pyparent)) { + subscription = _pycapsule_to_rcl_subscription(pyparent); + } else if (_is_pycapsule_rcl_publisher(pyparent)) { + publisher = _pycapsule_to_rcl_publisher(pyparent); + } else { + PyErr_Format(PyExc_TypeError, "Event parent was not a valid Publisher or Subscription."); + return NULL; + } + + event = _new_zero_initialized_rcl_event(); + if (!event) { + return NULL; + } + + if (subscription) { + ret = rcl_subscription_event_init(event, subscription, pyevent_type); + } else { + ret = rcl_publisher_event_init(event, publisher, pyevent_type); + } + if (!_check_rcl_return(ret, "Failed to initialize event")) { + PyMem_Free(event); + return NULL; + } + + pyevent = PyCapsule_New(event, "rcl_event_t", _destroy_event_capsule); + if (!pyevent) { + ret = rcl_event_fini(event); + PyMem_Free(event); + _check_rcl_return(ret, "Failed to fini 'rcl_event_t'"); + return NULL; + } + + return pyevent; +} + +/// Get a pending QoS event's data +/** + * After having determined that a middleware event is ready, get the callback payload. + * + * Raises RuntimeError on failure to take the event from the middleware + * Raises TypeError if the capsules are not the correct types + * Raises ValueError on unknown event_type argument + * + * \param[in] pyevent Event handle from rclpy_create_event + * \param[in] pyevent_type Enum value of + * rcl_publisher_event_type_t or rcl_subscription_event_type_t, chosen by the type of pyparent + * \param[in] pyparent Capsule containing the parent Publisher or Subscription + * \return Python object from rclpy.qos_event containing callback data + * \return NULL on failure + */ +static PyObject * +rclpy_take_event(PyObject * Py_UNUSED(self), PyObject * args) +{ + // Arguments + PyObject * pyevent = NULL; + PyObject * pyparent = NULL; + unsigned PY_LONG_LONG pyevent_type; + + // Type conversion + rcl_ret_t ret; + rcl_event_t * event = NULL; + PyObject * pyqos_event = NULL; + _qos_event_callback_data_t event_data; + _qos_event_data_filler_function event_filler = NULL; + + if (!PyArg_ParseTuple(args, "OOK", &pyevent, &pyparent, &pyevent_type)) { + return NULL; + } + + if (!_is_pycapsule_rcl_event(pyevent)) { + PyErr_Format(PyExc_TypeError, "Capsule was not a valid rcl_event_t"); + return NULL; + } + event = _pycapsule_to_rcl_event(pyevent); + + event_filler = _get_qos_event_data_filler_function_for(pyparent, pyevent_type); + if (!event_filler) { + return NULL; + } + + ret = rcl_take_event(event, &event_data); + if (!_check_rcl_return(ret, "Failed to take event")) { + return NULL; + } + + pyqos_event = event_filler(&event_data); + if (!pyqos_event) { + return NULL; + } + return pyqos_event; +} diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py new file mode 100644 index 000000000..b9e3f776c --- /dev/null +++ b/rclpy/test/test_qos_event.py @@ -0,0 +1,233 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import Mock + +import rclpy +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.qos_event import PublisherEventCallbacks +from rclpy.qos_event import QoSLivelinessChangedInfo +from rclpy.qos_event import QoSLivelinessLostInfo +from rclpy.qos_event import QoSOfferedDeadlineMissedInfo +from rclpy.qos_event import QoSPublisherEventType +from rclpy.qos_event import QoSRequestedDeadlineMissedInfo +from rclpy.qos_event import QoSSubscriptionEventType +from rclpy.qos_event import SubscriptionEventCallbacks + +from test_msgs.msg import Empty as EmptyMsg + + +class TestQoSEvent(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.context = rclpy.context.Context() + rclpy.init(context=cls.context) + cls.node = rclpy.create_node('TestQoSEvent', namespace='/rclpy/test', context=cls.context) + + @classmethod + def tearDownClass(cls): + cls.node.destroy_node() + rclpy.shutdown(context=cls.context) + + def test_publisher_constructor(self): + callbacks = PublisherEventCallbacks() + liveliness_callback = Mock() + deadline_callback = Mock() + + # No arg + publisher = self.node.create_publisher(EmptyMsg, 'test_topic') + self.assertEqual(len(publisher.event_handlers), 0) + self.node.destroy_publisher(publisher) + + # Arg with no callbacks + publisher = self.node.create_publisher(EmptyMsg, 'test_topic', event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), 0) + self.node.destroy_publisher(publisher) + + # Arg with one of the callbacks + callbacks.deadline = deadline_callback + publisher = self.node.create_publisher(EmptyMsg, 'test_topic', event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), 1) + self.node.destroy_publisher(publisher) + + # Arg with both callbacks + callbacks.liveliness = liveliness_callback + publisher = self.node.create_publisher(EmptyMsg, 'test_topic', event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), 2) + self.node.destroy_publisher(publisher) + + def test_subscription_constructor(self): + callbacks = SubscriptionEventCallbacks() + liveliness_callback = Mock() + deadline_callback = Mock() + message_callback = Mock() + + # No arg + subscription = self.node.create_subscription(EmptyMsg, 'test_topic', message_callback) + self.assertEqual(len(subscription.event_handlers), 0) + self.node.destroy_subscription(subscription) + + # Arg with no callbacks + subscription = self.node.create_subscription( + EmptyMsg, 'test_topic', message_callback, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), 0) + self.node.destroy_subscription(subscription) + + # Arg with one of the callbacks + callbacks.deadline = deadline_callback + subscription = self.node.create_subscription( + EmptyMsg, 'test_topic', message_callback, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), 1) + self.node.destroy_subscription(subscription) + + # Arg with both callbacks + callbacks.liveliness = liveliness_callback + subscription = self.node.create_subscription( + EmptyMsg, 'test_topic', message_callback, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), 2) + self.node.destroy_subscription(subscription) + + def test_event_create_destroy(self): + # Publisher event types + publisher = self.node.create_publisher(EmptyMsg, 'test_topic') + + event_handle = _rclpy.rclpy_create_event( + QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED, + publisher.publisher_handle) + self.assertIsNotNone(event_handle) + _rclpy.rclpy_destroy_entity(event_handle) + + event_handle = _rclpy.rclpy_create_event( + QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST, + publisher.publisher_handle) + self.assertIsNotNone(event_handle) + _rclpy.rclpy_destroy_entity(event_handle) + + self.node.destroy_publisher(publisher) + + # Subscription event types + message_callback = Mock() + subscription = self.node.create_subscription(EmptyMsg, 'test_topic', message_callback) + + event_handle = _rclpy.rclpy_create_event( + QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED, + subscription.subscription_handle) + self.assertIsNotNone(event_handle) + _rclpy.rclpy_destroy_entity(event_handle) + + event_handle = _rclpy.rclpy_create_event( + QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED, + subscription.subscription_handle) + self.assertIsNotNone(event_handle) + _rclpy.rclpy_destroy_entity(event_handle) + + self.node.destroy_subscription(subscription) + + def test_call_publisher_rclpy_event_apis(self): + # Go through the exposed apis and ensure that things don't explode when called + # Make no assumptions about being able to actually receive the events + publisher = self.node.create_publisher(EmptyMsg, 'test_topic') + wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() + _rclpy.rclpy_wait_set_init(wait_set, 0, 0, 0, 0, 0, 2, self.context.handle) + + deadline_event_handle = _rclpy.rclpy_create_event( + QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED, + publisher.publisher_handle) + deadline_event_index = _rclpy.rclpy_wait_set_add_entity( + 'event', wait_set, deadline_event_handle) + self.assertIsNotNone(deadline_event_index) + + liveliness_event_handle = _rclpy.rclpy_create_event( + QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST, + publisher.publisher_handle) + liveliness_event_index = _rclpy.rclpy_wait_set_add_entity( + 'event', wait_set, liveliness_event_handle) + self.assertIsNotNone(liveliness_event_index) + + # We live in our own namespace and have created no other participants, so + # there can't be any of these events. + _rclpy.rclpy_wait(wait_set, 0) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, deadline_event_index)) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, liveliness_event_index)) + + # Calling take data even though not ready should provide me an empty initialized message + # Tests data conversion utilities in C side + event_data = _rclpy.rclpy_take_event( + deadline_event_handle, + publisher.publisher_handle, + QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED) + self.assertIsInstance(event_data, QoSOfferedDeadlineMissedInfo) + self.assertEqual(event_data.total_count, 0) + self.assertEqual(event_data.total_count_change, 0) + + event_data = _rclpy.rclpy_take_event( + liveliness_event_handle, + publisher.publisher_handle, + QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST) + self.assertIsInstance(event_data, QoSLivelinessLostInfo) + self.assertEqual(event_data.total_count, 0) + self.assertEqual(event_data.total_count_change, 0) + + self.node.destroy_publisher(publisher) + + def test_call_subscription_rclpy_event_apis(self): + # Go through the exposed apis and ensure that things don't explode when called + # Make no assumptions about being able to actually receive the events + subscription = self.node.create_subscription(EmptyMsg, 'test_topic', Mock()) + wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() + _rclpy.rclpy_wait_set_init(wait_set, 0, 0, 0, 0, 0, 2, self.context.handle) + + deadline_event_handle = _rclpy.rclpy_create_event( + QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED, + subscription.subscription_handle) + deadline_event_index = _rclpy.rclpy_wait_set_add_entity( + 'event', wait_set, deadline_event_handle) + self.assertIsNotNone(deadline_event_index) + + liveliness_event_handle = _rclpy.rclpy_create_event( + QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED, + subscription.subscription_handle) + liveliness_event_index = _rclpy.rclpy_wait_set_add_entity( + 'event', wait_set, liveliness_event_handle) + self.assertIsNotNone(liveliness_event_index) + + # We live in our own namespace and have created no other participants, so + # there can't be any of these events. + _rclpy.rclpy_wait(wait_set, 0) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, deadline_event_index)) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, liveliness_event_index)) + + # Calling take data even though not ready should provide me an empty initialized message + # Tests data conversion utilities in C side + event_data = _rclpy.rclpy_take_event( + deadline_event_handle, + subscription.subscription_handle, + QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED) + self.assertIsInstance(event_data, QoSRequestedDeadlineMissedInfo) + self.assertEqual(event_data.total_count, 0) + self.assertEqual(event_data.total_count_change, 0) + + event_data = _rclpy.rclpy_take_event( + liveliness_event_handle, + subscription.subscription_handle, + QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED) + self.assertIsInstance(event_data, QoSLivelinessChangedInfo) + self.assertEqual(event_data.alive_count, 0) + self.assertEqual(event_data.alive_count_change, 0) + self.assertEqual(event_data.not_alive_count, 0) + self.assertEqual(event_data.not_alive_count_change, 0) + + self.node.destroy_subscription(subscription) diff --git a/rclpy/test/test_waitable.py b/rclpy/test/test_waitable.py index 7d29f70bd..734bc24ca 100644 --- a/rclpy/test/test_waitable.py +++ b/rclpy/test/test_waitable.py @@ -380,21 +380,23 @@ def test_waitable_with_mutually_exclusive_callback_group(self): class TestNumberOfEntities(unittest.TestCase): def test_add(self): - n1 = NumberOfEntities(1, 2, 3, 4, 5) - n2 = NumberOfEntities(10, 20, 30, 40, 50) + n1 = NumberOfEntities(1, 2, 3, 4, 5, 6) + n2 = NumberOfEntities(10, 20, 30, 40, 50, 60) n = n1 + n2 assert n.num_subscriptions == 11 assert n.num_guard_conditions == 22 assert n.num_timers == 33 assert n.num_clients == 44 assert n.num_services == 55 + assert n.num_events == 66 def test_add_assign(self): - n1 = NumberOfEntities(1, 2, 3, 4, 5) - n2 = NumberOfEntities(10, 20, 30, 40, 50) + n1 = NumberOfEntities(1, 2, 3, 4, 5, 6) + n2 = NumberOfEntities(10, 20, 30, 40, 50, 60) n1 += n2 assert n1.num_subscriptions == 11 assert n1.num_guard_conditions == 22 assert n1.num_timers == 33 assert n1.num_clients == 44 assert n1.num_services == 55 + assert n1.num_events == 66