Skip to content

Commit

Permalink
Unique network flows (#296)
Browse files Browse the repository at this point in the history
* Demonstrate unique network flows feature

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Use new unique network flow option

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Modify to match renamed API

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Rename files for consistency

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Update build configuration

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Catch exception on failure to create subscriber

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

Co-authored-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>
  • Loading branch information
anamud and Ananya Muddukrishna authored Apr 5, 2021
1 parent 654358e commit c378340
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 0 deletions.
4 changes: 4 additions & 0 deletions rclcpp/topics/minimal_publisher/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ ament_target_dependencies(publisher_lambda rclcpp std_msgs)
add_executable(publisher_member_function member_function.cpp)
ament_target_dependencies(publisher_member_function rclcpp std_msgs)

add_executable(publisher_member_function_with_unique_network_flow_endpoints member_function_with_unique_network_flow_endpoints.cpp)
ament_target_dependencies(publisher_member_function_with_unique_network_flow_endpoints rclcpp std_msgs)

add_executable(publisher_not_composable not_composable.cpp)
ament_target_dependencies(publisher_not_composable rclcpp std_msgs)

install(TARGETS
publisher_lambda
publisher_member_function
publisher_member_function_with_unique_network_flow_endpoints
publisher_not_composable
DESTINATION lib/${PROJECT_NAME}
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <functional>
#include <memory>
#include <sstream>
#include <string>
#include <vector>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp/publisher_options.hpp"
#include "std_msgs/msg/string.hpp"

using namespace std::chrono_literals;

class MinimalPublisherWithUniqueNetworkFlowEndpoints : public rclcpp::Node
{
public:
MinimalPublisherWithUniqueNetworkFlowEndpoints()
: Node("minimal_publisher_with_unique_network_flow_endpoints"), count_1_(0), count_2_(0)
{
// Create publisher with unique network flow endpoints
// Enable unique network flow endpoints via options
auto options_1 = rclcpp::PublisherOptions();
options_1.require_unique_network_flow_endpoints =
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED;
publisher_1_ = this->create_publisher<std_msgs::msg::String>("topic_1", 10, options_1);
timer_1_ = this->create_wall_timer(
500ms, std::bind(&MinimalPublisherWithUniqueNetworkFlowEndpoints::timer_1_callback, this));

// Create publisher without unique network flow endpoints
// Unique network flow endpoints are disabled in default options
auto options_2 = rclcpp::PublisherOptions();
publisher_2_ = this->create_publisher<std_msgs::msg::String>("topic_2", 10);
timer_2_ = this->create_wall_timer(
1000ms, std::bind(&MinimalPublisherWithUniqueNetworkFlowEndpoints::timer_2_callback, this));

// Get network flow endpoints
auto network_flow_endpoints_1 = publisher_1_->get_network_flow_endpoints();
auto network_flow_endpoints_2 = publisher_2_->get_network_flow_endpoints();

// Print network flow endpoints
print_network_flow_endpoints(network_flow_endpoints_1);
print_network_flow_endpoints(network_flow_endpoints_2);
}

private:
void timer_1_callback()
{
auto message = std_msgs::msg::String();
message.data = "Hello, world! " + std::to_string(count_1_++);

RCLCPP_INFO(
this->get_logger(), "Publishing: '%s'", message.data.c_str());
publisher_1_->publish(message);
}
void timer_2_callback()
{
auto message = std_msgs::msg::String();
message.data = "Hej, världen! " + std::to_string(count_2_++);

RCLCPP_INFO(
this->get_logger(), "Publishing: '%s'", message.data.c_str());
publisher_2_->publish(message);
}
/// Print network flow endpoints in JSON-like format
void print_network_flow_endpoints(
const std::vector<rclcpp::NetworkFlowEndpoint> & network_flow_endpoints) const
{
std::ostringstream stream;
stream << "{\"networkFlowEndpoints\": [";
bool comma_skip = true;
for (auto network_flow_endpoint : network_flow_endpoints) {
if (comma_skip) {
comma_skip = false;
} else {
stream << ",";
}
stream << network_flow_endpoint;
}
stream << "]}";
RCLCPP_INFO(
this->get_logger(), "%s",
stream.str().c_str());
}
rclcpp::TimerBase::SharedPtr timer_1_;
rclcpp::TimerBase::SharedPtr timer_2_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_1_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_2_;
size_t count_1_;
size_t count_2_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalPublisherWithUniqueNetworkFlowEndpoints>());
rclcpp::shutdown();
return 0;
}
4 changes: 4 additions & 0 deletions rclcpp/topics/minimal_subscriber/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ ament_target_dependencies(subscriber_member_function rclcpp std_msgs)
add_executable(subscriber_member_function_with_topic_statistics member_function_with_topic_statistics.cpp)
ament_target_dependencies(subscriber_member_function_with_topic_statistics rclcpp std_msgs)

add_executable(subscriber_member_function_with_unique_network_flow_endpoints member_function_with_unique_network_flow_endpoints.cpp)
ament_target_dependencies(subscriber_member_function_with_unique_network_flow_endpoints rclcpp std_msgs)

add_executable(subscriber_not_composable not_composable.cpp)
ament_target_dependencies(subscriber_not_composable rclcpp std_msgs)

install(TARGETS
subscriber_lambda
subscriber_member_function
subscriber_member_function_with_topic_statistics
subscriber_member_function_with_unique_network_flow_endpoints
subscriber_not_composable
DESTINATION lib/${PROJECT_NAME})

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


#include <functional>
#include <memory>
#include <sstream>
#include <string>
#include <vector>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp/subscription_options.hpp"
#include "std_msgs/msg/string.hpp"

using std::placeholders::_1;

class MinimalSubscriberWithUniqueNetworkFlowEndpoints : public rclcpp::Node
{
public:
MinimalSubscriberWithUniqueNetworkFlowEndpoints()
: Node("minimal_subscriber_with_unique_network_flow_endpoints")
{
try {
// Create subscription with unique network flow endpoints
// Enable unique network flow endpoints via options
// Since option is strict, expect exception
auto options_1 = rclcpp::SubscriptionOptions();
options_1.require_unique_network_flow_endpoints =
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED;

subscription_1_ = this->create_subscription<std_msgs::msg::String>(
"topic_1", 10, std::bind(
&MinimalSubscriberWithUniqueNetworkFlowEndpoints::topic_1_callback, this,
_1), options_1);

// Create subscription without unique network flow endpoints
// Unique network flow endpoints are disabled by default
auto options_2 = rclcpp::SubscriptionOptions();
subscription_2_ = this->create_subscription<std_msgs::msg::String>(
"topic_2", 10, std::bind(
&MinimalSubscriberWithUniqueNetworkFlowEndpoints::topic_2_callback, this,
_1), options_2);

// Get network flow endpoints
auto network_flow_endpoints_1 = subscription_1_->get_network_flow_endpoints();
auto network_flow_endpoints_2 = subscription_2_->get_network_flow_endpoints();

// Check if network flow endpoints are unique
for (auto network_flow_endpoint_1 : network_flow_endpoints_1) {
for (auto network_flow_endpoint_2 : network_flow_endpoints_2) {
if (network_flow_endpoint_1 == network_flow_endpoint_2) {
RCLCPP_ERROR(
this->get_logger(), "Network flow endpoints across subscriptions are not unique");
break;
}
}
}

// Print network flow endpoints
print_network_flow_endpoints(network_flow_endpoints_1);
print_network_flow_endpoints(network_flow_endpoints_2);
} catch (const rclcpp::exceptions::RCLError & e) {
RCLCPP_ERROR(
this->get_logger(),
"Error: %s",
e.what());
}
}

private:
void topic_1_callback(const std_msgs::msg::String::SharedPtr msg) const
{
RCLCPP_INFO(this->get_logger(), "Topic 1 news: '%s'", msg->data.c_str());
}
void topic_2_callback(const std_msgs::msg::String::SharedPtr msg) const
{
RCLCPP_INFO(this->get_logger(), "Topic 2 news: '%s'", msg->data.c_str());
}
/// Print network flow endpoints in JSON-like format
void print_network_flow_endpoints(
const std::vector<rclcpp::NetworkFlowEndpoint> & network_flow_endpoints) const
{
std::ostringstream stream;
stream << "{\"networkFlowEndpoints\": [";
bool comma_skip = true;
for (auto network_flow_endpoint : network_flow_endpoints) {
if (comma_skip) {
comma_skip = false;
} else {
stream << ",";
}
stream << network_flow_endpoint;
}
stream << "]}";
RCLCPP_INFO(
this->get_logger(), "%s",
stream.str().c_str());
}
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_1_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_2_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalSubscriberWithUniqueNetworkFlowEndpoints>());
rclcpp::shutdown();
return 0;
}

0 comments on commit c378340

Please sign in to comment.