Skip to content

Commit

Permalink
iox-eclipse-iceoryx#615 renaming of blocking options and extended opt…
Browse files Browse the repository at this point in the history
…ions example

Signed-off-by: Michael Poehnl <michael.poehnl@apex.ai>
  • Loading branch information
budrus authored and marthtz committed May 12, 2021
1 parent 25e3dd0 commit 40d56d6
Show file tree
Hide file tree
Showing 17 changed files with 36 additions and 37 deletions.
3 changes: 3 additions & 0 deletions iceoryx_binding_c/include/iceoryx_binding_c/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ typedef struct
/// @brief The option whether the publisher should already be offered when creating it
bool offerOnCreate;

/// @brief describes whether a publisher blocks when subscriber queue is full
ENUM iox_SubscriberTooSlowPolicy subscriberTooSlowPolicy;

/// @brief this value will be set exclusively by `iox_pub_options_init` and is not supposed to be modified otherwise
uint64_t initCheck;
} iox_pub_options_t;
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_binding_c/include/iceoryx_binding_c/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ typedef struct
bool subscribeOnCreate;

/// @brief describes whether a publisher blocks when subscriber queue is full
ENUM iox_QueueFullPolicy receiverQueueFullPolicy;
ENUM iox_QueueFullPolicy queueFullPolicy;

/// @brief this value will be set exclusively by iox_sub_options_init and is not supposed to be modified otherwise
uint64_t initCheck;
Expand Down
3 changes: 2 additions & 1 deletion iceoryx_binding_c/source/c_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ void iox_pub_options_init(iox_pub_options_t* options)
options->historyCapacity = publisherOptions.historyCapacity;
options->nodeName = nullptr;
options->offerOnCreate = publisherOptions.offerOnCreate;
options->subscriberTooSlowPolicy = cpp2c::subscriberTooSlowPolicy(publisherOptions.subscriberTooSlowPolicy);

options->initCheck = PUBLISHER_OPTIONS_INIT_CHECK_CONSTANT;
}
Expand Down Expand Up @@ -89,7 +90,7 @@ iox_pub_t iox_pub_init(iox_pub_storage_t* self,
publisherOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName);
}
publisherOptions.offerOnCreate = options->offerOnCreate;
publisherOptions.deliveryQueueFullPolicy = c2cpp::subscriberTooSlowPolicy(options->deliveryQueueFullPolicy);
publisherOptions.subscriberTooSlowPolicy = c2cpp::subscriberTooSlowPolicy(options->subscriberTooSlowPolicy);
}

me->m_portData = PoshRuntime::getInstance().getMiddlewarePublisher(
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_binding_c/source/c_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void iox_sub_options_init(iox_sub_options_t* options)
options->historyRequest = subscriberOptions.historyRequest;
options->nodeName = nullptr;
options->subscribeOnCreate = subscriberOptions.subscribeOnCreate;
options->receiverQueueFullPolicy = cpp2c::QueueFullPolicy(subscriberOptions.receiverQueueFullPolicy);
options->queueFullPolicy = cpp2c::queueFullPolicy(subscriberOptions.queueFullPolicy);

options->initCheck = SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT;
}
Expand Down Expand Up @@ -97,7 +97,7 @@ iox_sub_t iox_sub_init(iox_sub_storage_t* self,
subscriberOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName);
}
subscriberOptions.subscribeOnCreate = options->subscribeOnCreate;
subscriberOptions.receiverQueueFullPolicy = c2cpp::QueueFullPolicy(options->receiverQueueFullPolicy);
subscriberOptions.queueFullPolicy = c2cpp::queueFullPolicy(options->queueFullPolicy);
}

me->m_portData =
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_binding_c/test/moduletests/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ TEST(iox_pub_options_test, publisherOptionsAreInitializedCorrectly)
sut.historyCapacity = 37;
sut.nodeName = "Dr.Gonzo";
sut.offerOnCreate = false;
sut.deliveryQueueFullPolicy = SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER;
sut.subscriberTooSlowPolicy = SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER;

PublisherOptions options;
// set offerOnCreate to the opposite of the expected default to check if it gets overwritten to default
Expand All @@ -363,7 +363,7 @@ TEST(iox_pub_options_test, publisherOptionsAreInitializedCorrectly)
EXPECT_EQ(sut.historyCapacity, options.historyCapacity);
EXPECT_EQ(sut.nodeName, nullptr);
EXPECT_EQ(sut.offerOnCreate, options.offerOnCreate);
EXPECT_EQ(sut.deliveryQueueFullPolicy, cpp2c::SubscriberTooSlowPolicy(options.deliveryQueueFullPolicy));
EXPECT_EQ(sut.subscriberTooSlowPolicy, cpp2c::subscriberTooSlowPolicy(options.subscriberTooSlowPolicy));
EXPECT_TRUE(iox_pub_options_is_initialized(&sut));
}

Expand Down
4 changes: 2 additions & 2 deletions iceoryx_binding_c/test/moduletests/test_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ TEST(iox_sub_options_test, subscriberOptionsAreInitializedCorrectly)
sut.historyRequest = 73;
sut.nodeName = "Dr.Gonzo";
sut.subscribeOnCreate = false;
sut.receiverQueueFullPolicy = QueueFullPolicy_BLOCK_PUBLISHER;
sut.queueFullPolicy = QueueFullPolicy_BLOCK_PUBLISHER;

SubscriberOptions options;
// set subscribeOnCreate to the opposite of the expected default to check if it gets overwritten to default
Expand All @@ -419,7 +419,7 @@ TEST(iox_sub_options_test, subscriberOptionsAreInitializedCorrectly)
EXPECT_EQ(sut.historyRequest, options.historyRequest);
EXPECT_EQ(sut.nodeName, nullptr);
EXPECT_EQ(sut.subscribeOnCreate, options.subscribeOnCreate);
EXPECT_EQ(sut.receiverQueueFullPolicy, cpp2c::QueueFullPolicy(options.receiverQueueFullPolicy));
EXPECT_EQ(sut.queueFullPolicy, cpp2c::queueFullPolicy(options.queueFullPolicy));
EXPECT_TRUE(iox_sub_options_is_initialized(&sut));
}

Expand Down
3 changes: 3 additions & 0 deletions iceoryx_examples/icedelivery/iox_publisher_with_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ int main()
// grouping of publishers and subscribers within a process
publisherOptions.nodeName = "Pub_Node_With_Options";

// we allow the subscribers to block the publisher if they want to ensure that no samples are lost
publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER;

iox::popo::Publisher<RadarObject> publisher({"Radar", "FrontLeft", "Object"}, publisherOptions);

// we have to explicitely offer the publisher for making it visible to subscribers
Expand Down
18 changes: 5 additions & 13 deletions iceoryx_examples/icedelivery/iox_subscriber_with_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ int main()
// grouping of publishers and subscribers within a process
subscriberOptions.nodeName = "Sub_Node_With_Options";

// we request the publisher to wait for space in the queue if it is full. The publisher will be blocked then
subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PUBLISHER;

iox::popo::Subscriber<RadarObject> subscriber({"Radar", "FrontLeft", "Object"}, subscriberOptions);

// We have to explicitly call subscribe() otherwise the subscriber will not try to connect to publishers
Expand All @@ -66,19 +69,8 @@ int main()
// run until interrupted by Ctrl-C
while (!killswitch)
{
if (subscriber.getSubscriptionState() == iox::SubscribeState::SUBSCRIBED)
{
bool hasMoreSamples = true;
// Since we are checking only every second but the publisher is sending every
// 400ms a new sample we will receive here more then one sample.
do
{
subscriber.take()
.and_then([](auto& object) { std::cout << APP_NAME << " got value: " << object->x << std::endl; })
.or_else([&](auto&) { hasMoreSamples = false; });
} while (hasMoreSamples);
}
std::cout << std::endl;
subscriber.take().and_then(
[](auto& object) { std::cout << APP_NAME << " got value: " << object->x << std::endl; });

std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct PublisherOptions
bool offerOnCreate{true};

/// @brief The option whether the publisher should block when the subscriber queue is full
SubscriberTooSlowPolicy deliveryQueueFullPolicy{SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA};
SubscriberTooSlowPolicy subscriberTooSlowPolicy{SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA};
};

} // namespace popo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct SubscriberOptions
bool subscribeOnCreate{true};

/// @brief The option whether the publisher should block when the subscriber queue is full
QueueFullPolicy receiverQueueFullPolicy{QueueFullPolicy::DISCARD_OLDEST_DATA};
QueueFullPolicy queueFullPolicy{QueueFullPolicy::DISCARD_OLDEST_DATA};
};

} // namespace popo
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_posh/include/iceoryx_posh/roudi/port_pool.inl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ inline iox::popo::SubscriberPortData* PortPool::constructSubscriber(const capro:
return m_portPoolData->m_subscriberPortMembers.insert(
serviceDescription,
runtimeName,
(subscriberOptions.receiverQueueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA)
(subscriberOptions.queueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA)
? cxx::VariantQueueTypes::SoFi_MultiProducerSingleConsumer
: cxx::VariantQueueTypes::FiFo_MultiProducerSingleConsumer,
subscriberOptions,
Expand All @@ -46,7 +46,7 @@ inline iox::popo::SubscriberPortData* PortPool::constructSubscriber(const capro:
return m_portPoolData->m_subscriberPortMembers.insert(
serviceDescription,
runtimeName,
(subscriberOptions.receiverQueueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA)
(subscriberOptions.queueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA)
? cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer
: cxx::VariantQueueTypes::FiFo_SingleProducerSingleConsumer,
subscriberOptions,
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_posh/source/popo/ports/publisher_port_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ PublisherPortData::PublisherPortData(const capro::ServiceDescription& serviceDes
const mepoo::MemoryInfo& memoryInfo) noexcept
: BasePortData(serviceDescription, runtimeName, publisherOptions.nodeName)
, m_chunkSenderData(
memoryManager, publisherOptions.deliveryQueueFullPolicy, publisherOptions.historyCapacity, memoryInfo)
memoryManager, publisherOptions.subscriberTooSlowPolicy, publisherOptions.historyCapacity, memoryInfo)
, m_offeringRequested(publisherOptions.offerOnCreate)
{
}
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_posh/source/popo/ports/subscriber_port_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ SubscriberPortData::SubscriberPortData(const capro::ServiceDescription& serviceD
const SubscriberOptions& subscriberOptions,
const mepoo::MemoryInfo& memoryInfo) noexcept
: BasePortData(serviceDescription, runtimeName, subscriberOptions.nodeName)
, m_chunkReceiverData(queueType, subscriberOptions.receiverQueueFullPolicy, memoryInfo)
, m_chunkReceiverData(queueType, subscriberOptions.queueFullPolicy, memoryInfo)
, m_historyRequest(subscriberOptions.historyRequest)
, m_subscribeRequested(subscriberOptions.subscribeOnCreate)
{
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_posh/source/roudi/roudi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
options.historyCapacity = std::stoull(message.getElementAtIndex(3));
options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(4));
options.offerOnCreate = (0U == std::stoull(message.getElementAtIndex(5))) ? false : true;
options.deliveryQueueFullPolicy =
options.subscriberTooSlowPolicy =
static_cast<popo::SubscriberTooSlowPolicy>(std::stoul(message.getElementAtIndex(6)));

m_prcMgr->addPublisherForProcess(
Expand All @@ -251,7 +251,7 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
options.queueCapacity = std::stoull(message.getElementAtIndex(4));
options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(5));
options.subscribeOnCreate = (0U == std::stoull(message.getElementAtIndex(6))) ? false : true;
options.receiverQueueFullPolicy =
options.queueFullPolicy =
static_cast<popo::QueueFullPolicy>(std::stoul(message.getElementAtIndex(7)));

m_prcMgr->addSubscriberForProcess(
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_posh/source/runtime/posh_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ PublisherPortUserType::MemberType_t* PoshRuntime::getMiddlewarePublisher(const c
sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_PUBLISHER) << m_appName
<< static_cast<cxx::Serialization>(service).toString() << std::to_string(options.historyCapacity)
<< options.nodeName << std::to_string(options.offerOnCreate)
<< std::to_string(static_cast<uint8_t>(options.deliveryQueueFullPolicy))
<< std::to_string(static_cast<uint8_t>(options.subscriberTooSlowPolicy))
<< static_cast<cxx::Serialization>(portConfigInfo).toString();

auto maybePublisher = requestPublisherFromRoudi(sendBuffer);
Expand Down Expand Up @@ -295,7 +295,7 @@ PoshRuntime::getMiddlewareSubscriber(const capro::ServiceDescription& service,
sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_SUBSCRIBER) << m_appName
<< static_cast<cxx::Serialization>(service).toString() << std::to_string(options.historyRequest)
<< std::to_string(options.queueCapacity) << options.nodeName << std::to_string(options.subscribeOnCreate)
<< std::to_string(static_cast<uint8_t>(options.receiverQueueFullPolicy))
<< std::to_string(static_cast<uint8_t>(options.queueFullPolicy))
<< static_cast<cxx::Serialization>(portConfigInfo).toString();

auto maybeSubscriber = requestSubscriberFromRoudi(sendBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class PublisherSubscriberCommunication_test : public RouDi_GTest
createPublisher(const SubscriberTooSlowPolicy policy = SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA)
{
iox::popo::PublisherOptions options;
options.deliveryQueueFullPolicy = policy;
options.subscriberTooSlowPolicy = policy;
return std::make_unique<iox::popo::Publisher<T>>(m_serviceDescription, options);
}

Expand All @@ -71,7 +71,7 @@ class PublisherSubscriberCommunication_test : public RouDi_GTest
const uint64_t queueCapacity = SubscriberPortData::ChunkQueueData_t::MAX_CAPACITY)
{
iox::popo::SubscriberOptions options;
options.receiverQueueFullPolicy = policy;
options.queueFullPolicy = policy;
options.queueCapacity = queueCapacity;
return std::make_unique<iox::popo::Subscriber<T>>(m_serviceDescription, options);
}
Expand Down
8 changes: 4 additions & 4 deletions iceoryx_posh/test/moduletests/test_posh_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithoutExplicitlySetQueueFullPoli
TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithQueueFullPolicySetToDiscardOldestDataLeadsToDiscardOldestData)
{
iox::popo::PublisherOptions publisherOptions;
publisherOptions.deliveryQueueFullPolicy = iox::popo::SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA;
publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA;

const auto publisherPortData = m_runtime->getMiddlewarePublisher(iox::capro::ServiceDescription(90U, 130U, 1550U),
publisherOptions,
Expand All @@ -405,7 +405,7 @@ TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithQueueFullPolicySetToDiscardOl
TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithQueueFullPolicySetToWaitForSubscriberLeadsToWaitForSubscriber)
{
iox::popo::PublisherOptions publisherOptions;
publisherOptions.deliveryQueueFullPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER;
publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER;

const auto publisherPortData = m_runtime->getMiddlewarePublisher(
iox::capro::ServiceDescription(18U, 31U, 400U), publisherOptions, iox::runtime::PortConfigInfo(11U, 22U, 33U));
Expand Down Expand Up @@ -521,7 +521,7 @@ TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithoutExplicitlySetQueueFullPol
TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithQueueFullPolicySetToDiscardOldestDataLeadsToDiscardOldestData)
{
iox::popo::SubscriberOptions subscriberOptions;
subscriberOptions.receiverQueueFullPolicy = iox::popo::QueueFullPolicy::DISCARD_OLDEST_DATA;
subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::DISCARD_OLDEST_DATA;

const auto subscriberPortData = m_runtime->getMiddlewareSubscriber(iox::capro::ServiceDescription(90U, 130U, 1550U),
subscriberOptions,
Expand All @@ -534,7 +534,7 @@ TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithQueueFullPolicySetToDiscardO
TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithQueueFullPolicySetToBlockPublisherLeadsToBlockPublisher)
{
iox::popo::SubscriberOptions subscriberOptions;
subscriberOptions.receiverQueueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PUBLISHER;
subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PUBLISHER;

const auto subscriberPortData = m_runtime->getMiddlewareSubscriber(
iox::capro::ServiceDescription(18U, 31U, 400U), subscriberOptions, iox::runtime::PortConfigInfo(11U, 22U, 33U));
Expand Down

0 comments on commit 40d56d6

Please sign in to comment.