diff --git a/README.md b/README.md index d90b2050..7bcb8efc 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,7 @@ variables. - [RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY](#RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY) - [RMW_CONNEXT_INITIAL_PEERS](#RMW_CONNEXT_INITIAL_PEERS) - [RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE](#RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE) +- [RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY](#RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY) - [RMW_CONNEXT_REQUEST_REPLY_MAPPING](#RMW_CONNEXT_REQUEST_REPLY_MAPPING) - [RMW_CONNEXT_UDP_INTERFACE](#RMW_CONNEXT_UDP_INTERFACE) - [RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE](#RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE) @@ -275,6 +276,26 @@ In particular, when this mode is enabled, `rmw_connextdds` will revert to adding a suffix (`_`) to the end of the names of the attributes of the ROS2 data types propagated via DDS discovery. +### RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY + +Control how `rmw_connextdds` will override the default DomainParticipantQos obtained +from Connext. + +If this variable is unspecified, or set to `all`, then `rmw_connextdds` will modify +the default DomainParticipantQos with settings derived from ROS 2 options (e.g. +"localhost only", or "node enclave"), and some additional optimizations meant to +improve the out of the box experiene (e.g. speed up endpoint discovery, and increase +the size of type information shared via discovery). + +If the variable is set to `basic`, then only those settings associated with ROS 2 +options will be modified. + +If the variable is set to `never`, then no settings will be modified and the +DomainParticipantQos will be used as is. + +Note that values `basic` and `never` will disable the same endpoint discovery +optimizations controlled by [RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY](#RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY). + ### RMW_CONNEXT_REQUEST_REPLY_MAPPING The [DDS-RPC specification](https://www.omg.org/spec/DDS-RPC/About-DDS-RPC/) diff --git a/rmw_connextdds_common/include/rmw_connextdds/context.hpp b/rmw_connextdds_common/include/rmw_connextdds/context.hpp index 470bc80a..a53c7d46 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/context.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/context.hpp @@ -89,6 +89,25 @@ struct rmw_context_impl_t bool optimize_large_data{true}; #endif /* RMW_CONNEXT_DEFAULT_LARGE_DATA_OPTIMIZATIONS */ + enum class participant_qos_override_policy_t + { + // Always override the default DomainParticipantQoS obtained at runtime from + // Connext with RMW-specific configuration. This will include settings derived + // from ROS 2 configuration parameters (e.g. "localhost_only", or "enclave"), + // but also some additional configurations that the RMW performs arbitrarly + // to improve the out of the box experience. Note that some of these customizations + // can also be disabled individually (e.g. fast endpoint discovery). + All, + // Only perform basic modifications on the default DomainParticipantQos value + // based on ROS 2 configuration parameters (e.g. "localhost only", and "enclave"). + // All other RMW-specific customizations will not be applied. + Basic, + // Use the default DomainParticipantQoS returned by Connext without any modification. + Never, + }; + + participant_qos_override_policy_t participant_qos_override_policy; + enum class endpoint_qos_override_policy_t { // Use default QoS policy got from the DDS qos profile file applying topic filters @@ -106,6 +125,8 @@ struct rmw_context_impl_t endpoint_qos_override_policy_t endpoint_qos_override_policy; std::regex endpoint_qos_override_policy_topics_regex; + struct DDS_StringSeq initial_peers = DDS_SEQUENCE_INITIALIZER; + /* Participant reference count*/ size_t node_count{0}; std::mutex initialization_mutex; diff --git a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp index 41bac860..b5f1022f 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp @@ -94,8 +94,12 @@ #ifndef RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY #define RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY "RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY" -#endif /* RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES */ +#endif /* RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY */ +#ifndef RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY +#define RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY \ + "RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY" +#endif /* RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY */ /****************************************************************************** * DDS Implementation diff --git a/rmw_connextdds_common/src/common/rmw_context.cpp b/rmw_connextdds_common/src/common/rmw_context.cpp index c4ff62a7..aa1f9875 100644 --- a/rmw_connextdds_common/src/common/rmw_context.cpp +++ b/rmw_connextdds_common/src/common/rmw_context.cpp @@ -91,35 +91,14 @@ rmw_connextdds_initialize_participant_qos( return RMW_RET_ERROR; } - /* Lookup and configure initial peer from environment */ - const char * initial_peers = nullptr; - const char * lookup_rc = - rcutils_get_env(RMW_CONNEXT_ENV_INITIAL_PEERS, &initial_peers); - - if (nullptr != lookup_rc || nullptr == initial_peers) { - RMW_CONNEXT_LOG_ERROR_A_SET( - "failed to lookup from environment: " - "var=%s, " - "rc=%s ", - RMW_CONNEXT_ENV_INITIAL_PEERS, - lookup_rc) - return RMW_RET_ERROR; - } - - if ('\0' != initial_peers[0]) { - rmw_ret_t rc = rmw_connextdds_parse_string_list( - initial_peers, - &dp_qos.discovery.initial_peers, - ',' /* delimiter */, - true /* trim_elements */, - false /* allow_empty_elements */, - false /* append_values */); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR_A( - "failed to parse initial peers: '%s'", initial_peers) - return rc; + if (ctx->participant_qos_override_policy == + rmw_context_impl_t::participant_qos_override_policy_t::All && + DDS_StringSeq_get_length(&ctx->initial_peers) > 0) + { + if (!DDS_StringSeq_copy(&dp_qos.discovery.initial_peers, &ctx->initial_peers)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to copy initial peers sequence") + return RMW_RET_ERROR; } - RMW_CONNEXT_LOG_DEBUG_A("initial DDS peers: %s", initial_peers) } return RMW_RET_OK; @@ -175,7 +154,6 @@ rmw_context_impl_t::initialize_node( return RMW_RET_OK; } - rmw_ret_t rmw_context_impl_t::initialize_participant(const bool localhost_only) { @@ -183,52 +161,6 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only) this->localhost_only = localhost_only; - /* Lookup RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES env variable.*/ - const char * endpoint_qos_policy = nullptr; - const char * lookup_rc = rcutils_get_env( - RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, &endpoint_qos_policy); - - if (nullptr != lookup_rc || nullptr == endpoint_qos_policy) { - RMW_CONNEXT_LOG_ERROR_A_SET( - "failed to lookup from environment: " - "var=%s, " - "rc=%s ", - RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, - lookup_rc) - return RMW_RET_ERROR; - } - - this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Always; - const char dds_topic_policy_prefix[] = "dds_topics: "; - const char never_policy[] = "never"; - const char always_policy[] = "always"; - if ( - 0 == strncmp( - endpoint_qos_policy, dds_topic_policy_prefix, sizeof(dds_topic_policy_prefix) - 1u)) - { - this->endpoint_qos_override_policy = - rmw_context_impl_t::endpoint_qos_override_policy_t::DDSTopics; - try { - this->endpoint_qos_override_policy_topics_regex = - &endpoint_qos_policy[sizeof(dds_topic_policy_prefix) - 1u]; - } catch (std::regex_error & err) { - RMW_CONNEXT_LOG_ERROR_A_SET( - "regex expression provided in {%s} environment variable is invalid: %s\n", - RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, - err.what()); - return RMW_RET_ERROR; - } - } else if (0 == strcmp(endpoint_qos_policy, never_policy)) { - this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Never; - } else if (endpoint_qos_policy[0] != '\0' && strcmp(endpoint_qos_policy, always_policy) != 0) { - RMW_CONNEXT_LOG_ERROR_A_SET( - "Environment variable {%s} has an unexpected value {%s}. " - "Allowed values are {always}, {never} or {dds_topics: }.\n", - RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, - endpoint_qos_policy); - return RMW_RET_ERROR; - } - if (nullptr == RMW_Connext_gv_DomainParticipantFactory) { RMW_CONNEXT_LOG_ERROR("DDS DomainParticipantFactory not initialized") return RMW_RET_ERROR; @@ -708,6 +640,69 @@ rmw_api_connextdds_init_options_fini(rmw_init_options_t * init_options) return ret; } +static +rmw_ret_t +rmw_connextdds_parse_participant_qos_override_policy( + const char * const user_input, + rmw_context_impl_t::participant_qos_override_policy_t & policy) +{ + static const char pfx_never[] = "never"; + static const char pfx_all[] = "all"; + static const char pfx_basic[] = "basic"; + + policy = rmw_context_impl_t::participant_qos_override_policy_t::All; + + if (0 == strcmp(user_input, pfx_never)) { + policy = rmw_context_impl_t::participant_qos_override_policy_t::Never; + } else if (0 == strcmp(user_input, pfx_basic)) { + policy = rmw_context_impl_t::participant_qos_override_policy_t::Basic; + } else if (user_input[0] != '\0' && strcmp(user_input, pfx_all) != 0) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "unexpected value for participant qos override policy. " + "Allowed values are {all}, {basic}, or {never}: %s", + user_input); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +static +rmw_ret_t +rmw_connextdds_parse_endpoint_qos_override_policy( + const char * const user_input, + rmw_context_impl_t::endpoint_qos_override_policy_t & policy, + std::regex & policy_regex) +{ + static const char pfx_dds_topics[] = "dds_topics: "; + static const size_t pfx_dds_topics_len = sizeof(pfx_dds_topics) - 1u; + static const char pfx_never[] = "never"; + static const char pfx_always[] = "always"; + + policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Always; + + if (0 == strncmp(user_input, pfx_dds_topics, pfx_dds_topics_len)) { + policy = rmw_context_impl_t::endpoint_qos_override_policy_t::DDSTopics; + try { + policy_regex = &user_input[pfx_dds_topics_len]; + } catch (std::regex_error & err) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to parse regex for endpoint qos override policy: %s", + err.what()); + return RMW_RET_ERROR; + } + } else if (0 == strcmp(user_input, pfx_never)) { + policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Never; + } else if (user_input[0] != '\0' && strcmp(user_input, pfx_always) != 0) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "unexpected value for endpoint qos override policy. " + "Allowed values are {always}, {never} or {dds_topics: }: %s", + user_input); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} rmw_ret_t rmw_api_connextdds_init( @@ -810,6 +805,56 @@ rmw_api_connextdds_init( } ctx->use_default_publish_mode = '\0' != use_default_publish_mode_env[0]; + // Check if the user specified a custom override policy for participant qos. + const char * participant_qos_policy = nullptr; + lookup_rc = rcutils_get_env( + RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY, &participant_qos_policy); + + if (nullptr != lookup_rc || nullptr == participant_qos_policy) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to lookup from environment: " + "var=%s, " + "rc=%s ", + RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY, + lookup_rc) + return RMW_RET_ERROR; + } + + rc = rmw_connextdds_parse_participant_qos_override_policy( + participant_qos_policy, ctx->participant_qos_override_policy); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to parse value for environment variable {%s}", + RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY); + return RMW_RET_ERROR; + } + + // Check if the user specified a custom override policy for endpoint qos. + const char * endpoint_qos_policy = nullptr; + lookup_rc = rcutils_get_env( + RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, &endpoint_qos_policy); + + if (nullptr != lookup_rc || nullptr == endpoint_qos_policy) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to lookup from environment: " + "var=%s, " + "rc=%s ", + RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, + lookup_rc) + return RMW_RET_ERROR; + } + + rc = rmw_connextdds_parse_endpoint_qos_override_policy( + endpoint_qos_policy, + ctx->endpoint_qos_override_policy, + ctx->endpoint_qos_override_policy_topics_regex); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to parse value for environment variable {%s}", + RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY); + return RMW_RET_ERROR; + } + // Check if we should run in "compatibility mode" with Cyclone DDS. const char * cyclone_compatible_env = nullptr; lookup_rc = rcutils_get_env( @@ -922,6 +967,37 @@ rmw_api_connextdds_init( ctx->optimize_large_data = '\0' == disable_optimize_large_data_env[0]; #endif /* RMW_CONNEXT_DEFAULT_LARGE_DATA_OPTIMIZATIONS */ + /* Lookup and configure initial peer from environment */ + const char * initial_peers = nullptr; + lookup_rc = + rcutils_get_env(RMW_CONNEXT_ENV_INITIAL_PEERS, &initial_peers); + + if (nullptr != lookup_rc || nullptr == initial_peers) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to lookup from environment: " + "var=%s, " + "rc=%s ", + RMW_CONNEXT_ENV_INITIAL_PEERS, + lookup_rc) + return RMW_RET_ERROR; + } + + if ('\0' != initial_peers[0]) { + rmw_ret_t rc = rmw_connextdds_parse_string_list( + initial_peers, + &ctx->initial_peers, + ',' /* delimiter */, + true /* trim_elements */, + false /* allow_empty_elements */, + false /* append_values */); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR_A( + "failed to parse initial peers: '%s'", initial_peers) + return rc; + } + RMW_CONNEXT_LOG_DEBUG_A("initial DDS peers: %s", initial_peers) + } + if (nullptr == RMW_Connext_gv_DomainParticipantFactory) { RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipantFactory") diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index dcf8208a..3308cdfe 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -88,78 +88,79 @@ rmw_connextdds_initialize_participant_qos_impl( rmw_context_impl_t * const ctx, DDS_DomainParticipantQos * const dp_qos) { - if (ctx->localhost_only) { - if (DDS_RETCODE_OK != - DDS_PropertyQosPolicyHelper_assert_property( - &dp_qos->property, - "dds.transport.UDPv4.builtin.parent.allow_interfaces", - RMW_CONNEXT_LOCALHOST_ONLY_ADDRESS, - DDS_BOOLEAN_FALSE /* propagate */)) - { - RMW_CONNEXT_LOG_ERROR_A_SET( - "failed to assert property on participant: %s", - "dds.transport.UDPv4.builtin.parent.allow_interfaces") - return RMW_RET_ERROR; - } - } + switch (ctx->participant_qos_override_policy) { + case rmw_context_impl_t::participant_qos_override_policy_t::All: + case rmw_context_impl_t::participant_qos_override_policy_t::Basic: + { + // Parse and apply QoS parameters derived from ROS 2 configuration options. + + if (ctx->localhost_only) { + if (DDS_RETCODE_OK != + DDS_PropertyQosPolicyHelper_assert_property( + &dp_qos->property, + "dds.transport.UDPv4.builtin.parent.allow_interfaces", + RMW_CONNEXT_LOCALHOST_ONLY_ADDRESS, + DDS_BOOLEAN_FALSE /* propagate */)) + { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to assert property on participant: %s", + "dds.transport.UDPv4.builtin.parent.allow_interfaces") + return RMW_RET_ERROR; + } + } -#if RMW_CONNEXT_DONT_IGNORE_LOOPBACK_INTERFACE - // TODO(asorbini) Setting this property causes the middleware to send data - // over loopback, even if a better transport is available (e.g. shmem). - // This property is added to improve interoperability with other vendors. - // For this reason, it might be better to make this an optional behavior, - // based on an environment variable, and have the default not set it, - // to improve OOTB performance. - if (DDS_RETCODE_OK != - DDS_PropertyQosPolicyHelper_assert_property( - &dp_qos->property, - "dds.transport.UDPv4.builtin.ignore_loopback_interface", - "0", - DDS_BOOLEAN_FALSE /* propagate */)) - { - RMW_CONNEXT_LOG_ERROR_SET( - "failed to assert property on participant: " - "dds.transport.UDPv4.builtin.ignore_loopback_interface") - return RMW_RET_ERROR; - } -#endif /* RMW_CONNEXT_DONT_IGNORE_LOOPBACK_INTERFACE */ + const size_t user_data_len_in = + DDS_OctetSeq_get_length(&dp_qos->user_data.value); - const size_t user_data_len_in = - DDS_OctetSeq_get_length(&dp_qos->user_data.value); + if (user_data_len_in != 0) { + RMW_CONNEXT_LOG_WARNING( + "DomainParticipant's USER_DATA will be overwritten to " + "propagate node enclave") + } - if (user_data_len_in != 0) { - RMW_CONNEXT_LOG_WARNING( - "DomainParticipant's USER_DATA will be overwritten to " - "propagate node enclave") - } + const char * const user_data_fmt = "enclave=%s;"; - const char * const user_data_fmt = "enclave=%s;"; + const int user_data_len = + std::snprintf( + nullptr, 0, user_data_fmt, ctx->base->options.enclave) + 1; - const int user_data_len = - std::snprintf( - nullptr, 0, user_data_fmt, ctx->base->options.enclave) + 1; + if (!DDS_OctetSeq_ensure_length( + &dp_qos->user_data.value, user_data_len, user_data_len)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to set user_data length") + return RMW_RET_ERROR; + } - if (!DDS_OctetSeq_ensure_length( - &dp_qos->user_data.value, user_data_len, user_data_len)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to set user_data length") - return RMW_RET_ERROR; - } + char * const user_data_ptr = + reinterpret_cast( + DDS_OctetSeq_get_contiguous_buffer(&dp_qos->user_data.value)); - char * const user_data_ptr = - reinterpret_cast( - DDS_OctetSeq_get_contiguous_buffer(&dp_qos->user_data.value)); + const int user_data_rc = + std::snprintf( + user_data_ptr, + user_data_len, + user_data_fmt, + ctx->base->options.enclave); - const int user_data_rc = - std::snprintf( - user_data_ptr, - user_data_len, - user_data_fmt, - ctx->base->options.enclave); + if (user_data_rc < 0 || user_data_rc != user_data_len - 1) { + RMW_CONNEXT_LOG_ERROR_SET("failed to set user_data") + return RMW_RET_ERROR; + } + break; + } + default: + { + // No customization of DomainParticipantQos request, return immediately. + RMW_CONNEXT_LOG_DEBUG("using default Connext's DomainParticipantQos") + return RMW_RET_OK; + } + } - if (user_data_rc < 0 || user_data_rc != user_data_len - 1) { - RMW_CONNEXT_LOG_ERROR_SET("failed to set user_data") - return RMW_RET_ERROR; + if (rmw_context_impl_t::participant_qos_override_policy_t::Basic == + ctx->participant_qos_override_policy) + { + RMW_CONNEXT_LOG_DEBUG("applied only ROS 2 configuration to DomainParticipantQos") + return RMW_RET_OK; } #if RMW_CONNEXT_RTPS_AUTO_ID_FROM_UUID