Skip to content

Commit

Permalink
Refs #21094: Subscriber refactor
Browse files Browse the repository at this point in the history
Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>
  • Loading branch information
elianalf committed Jun 3, 2024
1 parent c09a8fe commit 6e36fe4
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 123 deletions.
152 changes: 63 additions & 89 deletions examples/cpp/custom_payload_pool/SubscriberApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,103 +19,77 @@

#include "SubscriberApp.hpp"

#include <condition_variable>
#include <csignal>

#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/qos/SubscriberQos.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>

using namespace eprosima::fastdds::dds;

std::atomic<bool> CustomPayloadPoolDataSubscriber::stop_(false);
std::mutex CustomPayloadPoolDataSubscriber::terminate_cv_mtx_;
std::condition_variable CustomPayloadPoolDataSubscriber::terminate_cv_;

CustomPayloadPoolDataSubscriber::CustomPayloadPoolDataSubscriber(
std::shared_ptr<CustomPayloadPool> payload_pool)
: payload_pool_(payload_pool)
, participant_(nullptr)
namespace eprosima {
namespace fastdds {
namespace examples {
namespace custom_payload_pool {

SubscriberApp::SubscriberApp(
const CLIParser::subscriber_config& config,
const std::string& topic_name)
: participant_(nullptr)
, subscriber_(nullptr)
, topic_(nullptr)
, reader_(nullptr)
, type_(new CustomPayloadPoolDataPubSubType())
, matched_(0)
, samples_(0)
, max_samples_(0)
{

}

bool CustomPayloadPoolDataSubscriber::is_stopped()
, samples_(config.samples)
, received_samples_(0)
, stop_(false)
{
return stop_;
}
payload_pool_ = std::make_shared<CustomPayloadPool>();

void CustomPayloadPoolDataSubscriber::stop()
{
stop_ = true;
terminate_cv_.notify_all();
}

bool CustomPayloadPoolDataSubscriber::init()
{
DomainParticipantQos pqos = PARTICIPANT_QOS_DEFAULT;
pqos.name("CustomPayloadPoolDataSubscriber");
// Create the participant
auto factory = DomainParticipantFactory::get_instance();

participant_ = factory->create_participant(0, pqos);

participant_ = factory->create_participant_with_default_profile(nullptr, StatusMask::none());
if (participant_ == nullptr)
{
return false;
throw std::runtime_error("Participant initialization failed");
}

/* Register the type */
// Register the type
type_.register_type(participant_);

/* Create the subscriber */
subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr);

// Create the subscriber
SubscriberQos sub_qos = SUBSCRIBER_QOS_DEFAULT;
participant_->get_default_subscriber_qos(sub_qos);
subscriber_ = participant_->create_subscriber(sub_qos, nullptr, StatusMask::none());
if (subscriber_ == nullptr)
{
return false;
throw std::runtime_error("Subscriber initialization failed");
}

/* Create the topic */
topic_ = participant_->create_topic(
"CustomPayloadPoolTopic",
type_.get_type_name(),
TOPIC_QOS_DEFAULT);

// Create the topic
TopicQos topic_qos = TOPIC_QOS_DEFAULT;
participant_->get_default_topic_qos(topic_qos);
topic_ = participant_->create_topic(topic_name, type_.get_type_name(), topic_qos);
if (topic_ == nullptr)
{
return false;
throw std::runtime_error("Topic initialization failed");
}

/* Create the reader */
DataReaderQos rqos = DATAREADER_QOS_DEFAULT;
rqos.reliability().kind = RELIABLE_RELIABILITY_QOS;

reader_ = subscriber_->create_datareader(topic_, rqos, this, StatusMask::all(), payload_pool_);

// Create the reader
DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT;
subscriber_->get_default_datareader_qos(reader_qos);
reader_ = subscriber_->create_datareader(topic_, reader_qos, this, StatusMask::all(), payload_pool_);
if (reader_ == nullptr)
{
return false;
throw std::runtime_error("DataReader initialization failed");
}

// Register SIGINT signal handler to stop thread execution
signal(SIGINT, [](int /*signum*/)
{
std::cout << "SIGINT received, stopping subscriber execution." << std::endl;
CustomPayloadPoolDataSubscriber::stop();
});

return true;
}

CustomPayloadPoolDataSubscriber::~CustomPayloadPoolDataSubscriber()
SubscriberApp::~SubscriberApp()
{
if (participant_ != nullptr)
{
Expand All @@ -124,18 +98,16 @@ CustomPayloadPoolDataSubscriber::~CustomPayloadPoolDataSubscriber()
}
}

void CustomPayloadPoolDataSubscriber::on_subscription_matched(
void SubscriberApp::on_subscription_matched(
DataReader*,
const SubscriptionMatchedStatus& info)
{
if (info.current_count_change == 1)
{
matched_ = info.total_count;
std::cout << "Subscriber matched." << std::endl;
}
else if (info.current_count_change == -1)
{
matched_ = info.total_count;
std::cout << "Subscriber unmatched." << std::endl;
}
else
Expand All @@ -145,45 +117,47 @@ void CustomPayloadPoolDataSubscriber::on_subscription_matched(
}
}

void CustomPayloadPoolDataSubscriber::on_data_available(
void SubscriberApp::on_data_available(
DataReader* reader)
{
SampleInfo info;
if (reader->take_next_sample(&hello_, &info) == RETCODE_OK)
while ((!is_stopped()) && (RETCODE_OK == reader->take_next_sample(&hello_, &info)))
{
if (info.instance_state == ALIVE_INSTANCE_STATE)
if ((info.instance_state == ALIVE_INSTANCE_STATE) && info.valid_data)
{
samples_++;
// Print your structure data here.
std::cout << "Message [" << samples_ << "] of " << hello_.message() << " " << hello_.index()
<< " RECEIVED" << std::endl;

if (max_samples_ > 0 && (samples_ >= max_samples_))
received_samples_++;
// Print Hello world message data
std::cout << "Message: '" << hello_.message() << "' with index: '" << hello_.index()
<< "' RECEIVED" << std::endl;
if (samples_ > 0 && (received_samples_ >= samples_))
{
stop();
}
}
}
}

bool CustomPayloadPoolDataSubscriber::run(
uint32_t samples)
void SubscriberApp::run()
{
max_samples_ = samples;
stop_ = false;
if (samples == 0)
{
std::cout << "Subscriber running. Please press Ctrl+C to stop the Subscriber at any time." << std::endl;
}
else
{
std::cout << "Subscriber running until " << samples << " samples have been received" << std::endl;
}

std::unique_lock<std::mutex> lck(terminate_cv_mtx_);
terminate_cv_.wait(lck, []
terminate_cv_.wait(lck, [&]
{
return is_stopped();
});
return is_stopped();
}

bool SubscriberApp::is_stopped()
{
return stop_.load();
}

void SubscriberApp::stop()
{
stop_.store(true);
terminate_cv_.notify_all();
}

} // namespace custom_payload_pool
} // namespace examples
} // namespace fastdds
} // namespace eprosima
77 changes: 43 additions & 34 deletions examples/cpp/custom_payload_pool/SubscriberApp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*
*/

#ifndef CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_H_
#define CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_H_
#ifndef _FASTDDS_CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_HPP_
#define _FASTDDS_CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_HPP_

#include <condition_variable>
#include <mutex>
Expand All @@ -28,68 +28,77 @@
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>

#include "Application.hpp"
#include "CLIParser.hpp"
#include "CustomPayloadPool.hpp"
#include "CustomPayloadPoolDataPubSubTypes.h"

class CustomPayloadPoolDataSubscriber : private eprosima::fastdds::dds::DataReaderListener
using namespace eprosima::fastdds::dds;
namespace eprosima {
namespace fastdds {
namespace examples {
namespace custom_payload_pool {
class SubscriberApp : public Application, public DataReaderListener
{
public:

CustomPayloadPoolDataSubscriber(
std::shared_ptr<CustomPayloadPool> payload_pool);
SubscriberApp(
const CLIParser::subscriber_config& config,
const std::string& topic_name);

virtual ~CustomPayloadPoolDataSubscriber();

//!Initialize the subscriber
bool init();

//!Run the subscriber until all samples have been received.
bool run(
uint32_t samples);

private:
virtual ~SubscriberApp();

//! Subscription callback
void on_data_available(
eprosima::fastdds::dds::DataReader* reader) override;
DataReader* reader) override;

//! Subscriber matched method
void on_subscription_matched(
eprosima::fastdds::dds::DataReader* reader,
const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override;
DataReader* reader,
const SubscriptionMatchedStatus& info) override;

//! Return the current state of execution
static bool is_stopped();
//!Run the subscriber until all samples have been received.
void run() override;

//! Trigger the end of execution
static void stop();
void stop() override;

private:

//! Return the current state of execution
bool is_stopped();

CustomPayloadPoolData hello_;

std::shared_ptr<CustomPayloadPool> payload_pool_;

eprosima::fastdds::dds::DomainParticipant* participant_;

eprosima::fastdds::dds::Subscriber* subscriber_;
DomainParticipant* participant_;

eprosima::fastdds::dds::Topic* topic_;
Subscriber* subscriber_;

eprosima::fastdds::dds::DataReader* reader_;
Topic* topic_;

eprosima::fastdds::dds::TypeSupport type_;
DataReader* reader_;

int32_t matched_;
TypeSupport type_;

uint32_t samples_;
uint16_t samples_;

uint32_t max_samples_;
uint16_t received_samples_;

//! Member used for control flow purposes
static std::atomic<bool> stop_;
std::atomic<bool> stop_;

//! Protects terminate condition variable
static std::mutex terminate_cv_mtx_;
mutable std::mutex terminate_cv_mtx_;

//! Waits during execution until SIGINT or max_messages_ samples are received
static std::condition_variable terminate_cv_;
std::condition_variable terminate_cv_;
};

#endif /* CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_H_ */
} // namespace custom_payload_pool
} // namespace examples
} // namespace fastdds
} // namespace eprosima

#endif /* _FASTDDS_CUSTOM_PAYLOAD_POOL_DATA_SUBSCRIBER_HPP_ */

0 comments on commit 6e36fe4

Please sign in to comment.