Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add convenience name translations for use by commandline utilities etc. #352

Merged
merged 3 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions rclpy/rclpy/qos.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from enum import IntEnum
import warnings

Expand Down Expand Up @@ -228,53 +229,78 @@ def __eq__(self, other):
for slot in self.__slots__)


class QoSHistoryPolicy(IntEnum):
class QoSPolicyEnum(IntEnum):
"""
Base for QoS Policy enumerations.

Provides helper function to filter keys for utilities.
"""

@classmethod
def short_keys(cls):
return [k for k in cls.__members__.keys() if not k.startswith('RMW')]


class QoSHistoryPolicy(QoSPolicyEnum):
"""
Enum for QoS History settings.

This enum matches the one defined in rmw/types.h
"""

RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT = 0
system_default = RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT
RMW_QOS_POLICY_HISTORY_KEEP_LAST = 1
keep_last = RMW_QOS_POLICY_HISTORY_KEEP_LAST
RMW_QOS_POLICY_HISTORY_KEEP_ALL = 2
keep_all = RMW_QOS_POLICY_HISTORY_KEEP_ALL


class QoSReliabilityPolicy(IntEnum):
class QoSReliabilityPolicy(QoSPolicyEnum):
"""
Enum for QoS Reliability settings.

This enum matches the one defined in rmw/types.h
"""

RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT = 0
system_default = RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT
RMW_QOS_POLICY_RELIABILITY_RELIABLE = 1
reliable = RMW_QOS_POLICY_RELIABILITY_RELIABLE
RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT = 2
best_effort = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT


class QoSDurabilityPolicy(IntEnum):
class QoSDurabilityPolicy(QoSPolicyEnum):
"""
Enum for QoS Durability settings.

This enum matches the one defined in rmw/types.h
"""

RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT = 0
system_default = RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT
RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL = 1
transient_local = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL
RMW_QOS_POLICY_DURABILITY_VOLATILE = 2
volatile = RMW_QOS_POLICY_DURABILITY_VOLATILE


class QoSLivelinessPolicy(IntEnum):
class QoSLivelinessPolicy(QoSPolicyEnum):
"""
Enum for QoS Liveliness settings.

This enum matches the one defined in rmw/types.h
"""

RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT = 0
system_default = RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT
RMW_QOS_POLICY_LIVELINESS_AUTOMATIC = 1
automatic = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC
RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE = 2
manual_by_node = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE
RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC = 3
manual_by_topic = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC


class DeprecatedQoSProfile(QoSProfile):
Expand Down Expand Up @@ -308,3 +334,12 @@ def __init__(self, qos_profile: QoSProfile, profile_name: str):
'qos_profile_parameter_events')
qos_profile_action_status_default = _rclpy_action.rclpy_action_get_rmw_qos_profile(
'rcl_action_qos_profile_status_default')


class QoSPresetProfiles(Enum):
system_default = qos_profile_system_default
sensor_data = qos_profile_sensor_data
services_default = qos_profile_services_default
parameters = qos_profile_parameters
parameter_events = qos_profile_parameter_events
action_status_default = qos_profile_action_status_default
16 changes: 16 additions & 0 deletions rclpy/test/test_qos.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

from rclpy.duration import Duration
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import qos_profile_system_default
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSLivelinessPolicy
from rclpy.qos import QoSPresetProfiles
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy

Expand Down Expand Up @@ -120,3 +122,17 @@ def test_deprecation_warnings(self):
# No deprecation if only depth
QoSProfile(depth=1)
assert len(w) == 0, str(w[-1].message)

def test_policy_short_names(self):
# Full test on History to show the mechanism works
assert QoSHistoryPolicy.short_keys() == ['system_default', 'keep_last', 'keep_all']
assert (
QoSHistoryPolicy['system_default'] ==
QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT)
assert QoSHistoryPolicy['keep_all'] == QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_ALL
assert QoSHistoryPolicy['keep_last'] == QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST

def test_preset_profiles(self):
# Make sure the Enum does what we expect
assert QoSPresetProfiles.system_default.value == qos_profile_system_default
assert QoSPresetProfiles['system_default'] == QoSPresetProfiles.system_default